You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Nick Pan <as...@gmail.com> on 2022/03/22 23:36:19 UTC

Best Practice to read a list of rows by keys with SpannerIO

Let's say I have a list of users in PCollection<String> - userIds.   And I
want to do a "select * from table where userId=?" for each user.   What is
the recommended approach of doing this in beam?

I have seen approaches where we first turn userIds into a
PCollection<ReadOperation>, then feed them to SpannerIO.   Something like
this:

PCollection<String> userIds = ...

userIds.apply(

   MapElements.into(TypeDescriptor.of(ReadOperation.*class*))

     .via((SerializableFunction<Struct, ReadOperation>) input -> {

      String userId = input.getString(0);

      *return* ReadOperation.create().*withQuery*
<https://www.tabnine.com/code/java/methods/org.apache.beam.sdk.io.gcp.spanner.ReadOperation/withQuery>("select
* from table where user_id=" + userId);

     })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

But is this the most scalable approach since it looks like we will be
sending 1 sql statement per user to Spanner, as opposed to doing an actual
batch query like "select * from table where userId in {...userId
list...}".

Thanks,
Nick

Re: Best Practice to read a list of rows by keys with SpannerIO

Posted by Reuven Lax <re...@google.com>.
Another idea - key by a hash modulo N, where N represents how many users
you want in your select list. Then issue a GroupByKey, and generate the
ReadOperation corresponding to the users in each iterable.

On Tue, Mar 22, 2022 at 4:36 PM Nick Pan <as...@gmail.com> wrote:

> Let's say I have a list of users in PCollection<String> - userIds.   And I
> want to do a "select * from table where userId=?" for each user.   What is
> the recommended approach of doing this in beam?
>
> I have seen approaches where we first turn userIds into a
> PCollection<ReadOperation>, then feed them to SpannerIO.   Something like
> this:
>
> PCollection<String> userIds = ...
>
> userIds.apply(
>
>    MapElements.into(TypeDescriptor.of(ReadOperation.*class*))
>
>      .via((SerializableFunction<Struct, ReadOperation>) input -> {
>
>       String userId = input.getString(0);
>
>       *return* ReadOperation.create().*withQuery*
> <https://www.tabnine.com/code/java/methods/org.apache.beam.sdk.io.gcp.spanner.ReadOperation/withQuery>("select
> * from table where user_id=" + userId);
>
>      })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));
>
> But is this the most scalable approach since it looks like we will be
> sending 1 sql statement per user to Spanner, as opposed to doing an actual
> batch query like "select * from table where userId in {...userId
> list...}".
>
> Thanks,
> Nick
>