You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Luke Cwik <lc...@google.com> on 2021/08/09 22:59:37 UTC

Re: Mapping *part* of a PCollection possible? (Lens optics for PCollection?)

Your best bet is to write a DoFn that performs the call to Redis for each
key performing the "join" directly vs using another PCollection. Redis
scales quite well for this kind of use case. If you want to ensure that
there is only one lookup per key then you'll need to use a GroupByKey
before the lookup DoFn.

If the client library you're using can batch calls you may want to look at
using the GroupIntoBatches transform[1] as well.

1:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java

On Wed, Jul 21, 2021 at 10:46 AM Vincent Marquez <vi...@gmail.com>
wrote:

>
>
>
> On Wed, Jul 21, 2021 at 10:37 AM Andrew Kettmann <ak...@evolve24.com>
> wrote:
>
>> Worth noting that you never "lose" a PCollection. You can use the same
>> PCollection in as many transforms as you like and every time you reference
>> that PCollection<A> it will be in the same state it was when you first read
>> it in.
>>
>> So if you have:
>>
>> PCollection<A> colA = ...;
>> PCollection<RedisData> = colA.apply(ParDo.of(new ReadRedisDataDoFn());
>>
>> You have not consumed the colA PCollection and can reference/use it as
>> many times as you want in further steps.
>>
>> My instinct for this is:
>>
>>
>>    1. Read Source to get PCollection<A>
>>    2. Pull the key to look up in Redis from Pcollection<A> into another
>>    PCollection
>>    3. Look up with a custom DoFn if the normal IO one doesn't meet your
>>    needs
>>    4. CoGroupByKey transform to group them together
>>
>>
> I have done that, however, this doesn't really work for my use case in a
> streaming pipeline.  Both of the PCollections need to have the same
> windowing and under high load if I don't want to buffer a ton of data I
> might get outputs with one side being empty.
>
>
>>
>>    1. Do Whatever else you need to do with the combined data.
>>
>>
>> ------------------------------
>> *From:* Vincent Marquez <vi...@gmail.com>
>> *Sent:* Wednesday, July 21, 2021 12:14 PM
>> *To:* user <us...@beam.apache.org>
>> *Subject:* Mapping *part* of a PCollection possible? (Lens optics for
>> PCollection?)
>>
>> Let's say I have PCollection<A> and I want to use the 'readAll' pattern
>> to enhance some data from an additional source such as redis (which has a
>> readKeys PTransform<String, RedisData>).  However I don't want to 'lose'
>> the original A.  There *are* a few ways to do this currently (side inputs,
>> joining two streams with CoGroupByKey, using State) all of which have some
>> problems.
>>
>> If I could map PCollection<A> into some type that has <A, String> for
>> instance PCollection<KV<A, String>>, then use the redis readKeys to map to
>> PCollection<KV<A, RedisData>> this solves all my problems. This is more or
>> less a get/set lens optic if anyone is familiar with functional
>> programming.
>>
>> Is something like this possible?  Could it be added?  I've run
>> into wanting this pattern numerous times over the last year.
>>
>>
>> *~Vincent*
>>
>> evolve24 Confidential & Proprietary Statement: This email and any
>> attachments are confidential and may contain information that is
>> privileged, confidential or exempt from disclosure under applicable law. It
>> is intended for the use of the recipients. If you are not the intended
>> recipient, or believe that you have received this communication in error,
>> please do not read, print, copy, retransmit, disseminate, or otherwise use
>> the information. Please delete this email and attachments, without reading,
>> printing, copying, forwarding or saving them, and notify the Sender
>> immediately by reply email. No confidentiality or privilege is waived or
>> lost by any transmission in error.
>>
>