You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Nick Pan <as...@gmail.com> on 2022/03/22 23:36:19 UTC
Best Practice to read a list of rows by keys with SpannerIO
Let's say I have a list of users in PCollection<String> - userIds. And I
want to do a "select * from table where userId=?" for each user. What is
the recommended approach of doing this in beam?
I have seen approaches where we first turn userIds into a
PCollection<ReadOperation>, then feed them to SpannerIO. Something like
this:
PCollection<String> userIds = ...
userIds.apply(
MapElements.into(TypeDescriptor.of(ReadOperation.*class*))
.via((SerializableFunction<Struct, ReadOperation>) input -> {
String userId = input.getString(0);
*return* ReadOperation.create().*withQuery*
<https://www.tabnine.com/code/java/methods/org.apache.beam.sdk.io.gcp.spanner.ReadOperation/withQuery>("select
* from table where user_id=" + userId);
})).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));
But is this the most scalable approach since it looks like we will be
sending 1 sql statement per user to Spanner, as opposed to doing an actual
batch query like "select * from table where userId in {...userId
list...}".
Thanks,
Nick
Re: Best Practice to read a list of rows by keys with SpannerIO
Posted by Reuven Lax <re...@google.com>.
Another idea - key by a hash modulo N, where N represents how many users
you want in your select list. Then issue a GroupByKey, and generate the
ReadOperation corresponding to the users in each iterable.
On Tue, Mar 22, 2022 at 4:36 PM Nick Pan <as...@gmail.com> wrote:
> Let's say I have a list of users in PCollection<String> - userIds. And I
> want to do a "select * from table where userId=?" for each user. What is
> the recommended approach of doing this in beam?
>
> I have seen approaches where we first turn userIds into a
> PCollection<ReadOperation>, then feed them to SpannerIO. Something like
> this:
>
> PCollection<String> userIds = ...
>
> userIds.apply(
>
> MapElements.into(TypeDescriptor.of(ReadOperation.*class*))
>
> .via((SerializableFunction<Struct, ReadOperation>) input -> {
>
> String userId = input.getString(0);
>
> *return* ReadOperation.create().*withQuery*
> <https://www.tabnine.com/code/java/methods/org.apache.beam.sdk.io.gcp.spanner.ReadOperation/withQuery>("select
> * from table where user_id=" + userId);
>
> })).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));
>
> But is this the most scalable approach since it looks like we will be
> sending 1 sql statement per user to Spanner, as opposed to doing an actual
> batch query like "select * from table where userId in {...userId
> list...}".
>
> Thanks,
> Nick
>