You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Lars BK <la...@gmail.com> on 2017/07/28 12:18:50 UTC

Reading and writing to external services in DoFns

Hi everyone,


I'm researching how to handle a particular use case in Beam that I imagine
is common, but that I haven't been able to find any agreed upon best way of
doing yet.

*Use case: *I'm processing a stream or batch of records with ids, and for
each record I want to check whether I've ever seen its id before (beyond
the scope of the job execution). In particular, I'm going to be using
Google Dataflow, and I plan to store and look up ids in Google Datastore.

*Question*: Is it advisable to look up the record id in Datastore per
element in a DoFn? I am most worried about latency, and I am wary of the
recommendation in the documentation for ParDo
<https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/transforms/ParDo.html>
that says I'd have to be careful when I write to Datastore:

> "..if a DoFn's
<https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/transforms/DoFn.html>
execution has external side-effects, such as performing updates to external
HTTP services, then the DoFn's
<https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/transforms/DoFn.html>
code needs to take care to ensure that those updates are idempotent and
that concurrent updates are acceptable."

I found a relevant question on StackOverflow
<https://stackoverflow.com/questions/40049621/datastore-queries-in-dataflow-dofn-slow-down-pipeline-when-run-in-the-cloud>
where a user is doing something very similar to what I had in mind, and
another user says that:

> "For each partition of your PCollection the calls to Datastore are going
to be single-threaded, hence incur a lot of latency."

Is this something I should be worried about, and if so, does anyone know of
a better way? The second suggestion of the same user is to read all the ids
from Datastore and use a CoGroupByKey, I don't think that apporach that
would support streaming mode.


I hope somebody here has experience with similar patterns, and I'd greatly
appreciate any tips you could share!

Regards, Lars

Re: Reading and writing to external services in DoFns

Posted by Lars BK <la...@gmail.com>.
Hi Vilhelm,

Thanks, it helps. Yes, it should work in batch. I suppose it can work in
streaming too, if one gets the windowing straight, but I haven't given much
thought to that situation yet! I'm not sure when records written at the end
of the streaming pipeline would be available for lookup in the beginning of
the pipeline, but perhaps one could leverage stateful processing somehow to
not have to worry about that.

Regards,
Lars

On Sat, Jul 29, 2017 at 10:53 AM Vilhelm von Ehrenheim <
vonehrenheim@gmail.com> wrote:

> Hi!
> You will get a slow pipeline if you do the DB lookups from inside a DoFn.
>
> I have done similar things to what you describe but use the "read all data
> and join" approach but only in batch settings so far. But I think it should
> work fine in the streaming setting as well as long as you add to the set of
> previous records. However, if the index of previous records can fit into
> memory on the nodes I would recommend to use a side input instead that you
> do the check against in the DoFn. That should both be fast and work well in
> streaming.
>
> Hope it helps.
>
> Br,
> Vilhelm von Ehrenheim
>
>
>
> On 28 Jul 2017 14:19, "Lars BK" <la...@gmail.com> wrote:
>
>> Hi everyone,
>>
>>
>> I'm researching how to handle a particular use case in Beam that I
>> imagine is common, but that I haven't been able to find any agreed upon
>> best way of doing yet.
>>
>> *Use case: *I'm processing a stream or batch of records with ids, and
>> for each record I want to check whether I've ever seen its id before
>> (beyond the scope of the job execution). In particular, I'm going to be
>> using Google Dataflow, and I plan to store and look up ids in Google
>> Datastore.
>>
>> *Question*: Is it advisable to look up the record id in Datastore per
>> element in a DoFn? I am most worried about latency, and I am wary of the
>> recommendation in the documentation for ParDo
>> <https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/transforms/ParDo.html>
>> that says I'd have to be careful when I write to Datastore:
>>
>> > "..if a DoFn's
>> <https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/transforms/DoFn.html>
>> execution has external side-effects, such as performing updates to external
>> HTTP services, then the DoFn's
>> <https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/transforms/DoFn.html>
>> code needs to take care to ensure that those updates are idempotent and
>> that concurrent updates are acceptable."
>>
>> I found a relevant question on StackOverflow
>> <https://stackoverflow.com/questions/40049621/datastore-queries-in-dataflow-dofn-slow-down-pipeline-when-run-in-the-cloud>
>> where a user is doing something very similar to what I had in mind, and
>> another user says that:
>>
>> > "For each partition of your PCollection the calls to Datastore are
>> going to be single-threaded, hence incur a lot of latency."
>>
>> Is this something I should be worried about, and if so, does anyone know
>> of a better way? The second suggestion of the same user is to read all the
>> ids from Datastore and use a CoGroupByKey, I don't think that apporach that
>> would support streaming mode.
>>
>>
>> I hope somebody here has experience with similar patterns, and I'd
>> greatly appreciate any tips you could share!
>>
>> Regards, Lars
>>
>

Re: Reading and writing to external services in DoFns

Posted by Vilhelm von Ehrenheim <vo...@gmail.com>.
Hi!
You will get a slow pipeline if you do the DB lookups from inside a DoFn.

I have done similar things to what you describe but use the "read all data
and join" approach but only in batch settings so far. But I think it should
work fine in the streaming setting as well as long as you add to the set of
previous records. However, if the index of previous records can fit into
memory on the nodes I would recommend to use a side input instead that you
do the check against in the DoFn. That should both be fast and work well in
streaming.

Hope it helps.

Br,
Vilhelm von Ehrenheim



On 28 Jul 2017 14:19, "Lars BK" <la...@gmail.com> wrote:

> Hi everyone,
>
>
> I'm researching how to handle a particular use case in Beam that I imagine
> is common, but that I haven't been able to find any agreed upon best way of
> doing yet.
>
> *Use case: *I'm processing a stream or batch of records with ids, and for
> each record I want to check whether I've ever seen its id before (beyond
> the scope of the job execution). In particular, I'm going to be using
> Google Dataflow, and I plan to store and look up ids in Google Datastore.
>
> *Question*: Is it advisable to look up the record id in Datastore per
> element in a DoFn? I am most worried about latency, and I am wary of the
> recommendation in the documentation for ParDo
> <https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/transforms/ParDo.html>
> that says I'd have to be careful when I write to Datastore:
>
> > "..if a DoFn's
> <https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/transforms/DoFn.html>
> execution has external side-effects, such as performing updates to external
> HTTP services, then the DoFn's
> <https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/transforms/DoFn.html>
> code needs to take care to ensure that those updates are idempotent and
> that concurrent updates are acceptable."
>
> I found a relevant question on StackOverflow
> <https://stackoverflow.com/questions/40049621/datastore-queries-in-dataflow-dofn-slow-down-pipeline-when-run-in-the-cloud>
> where a user is doing something very similar to what I had in mind, and
> another user says that:
>
> > "For each partition of your PCollection the calls to Datastore are
> going to be single-threaded, hence incur a lot of latency."
>
> Is this something I should be worried about, and if so, does anyone know
> of a better way? The second suggestion of the same user is to read all the
> ids from Datastore and use a CoGroupByKey, I don't think that apporach that
> would support streaming mode.
>
>
> I hope somebody here has experience with similar patterns, and I'd greatly
> appreciate any tips you could share!
>
> Regards, Lars
>