You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Brian Hulette <bh...@apache.org> on 2021/03/31 17:37:26 UTC

Re: How to Parallelize Google Cloud Storage Blob Downloads with Grouping

I'm not very familiar with SDF so I can't comment on that approach.
Maybe +Boyuan
Zhang <bo...@google.com> would be helpful there.

What if FileIO could yield the glob that was matched along with each file?
Then you could use that as a grouping key later on.

Brian

On Wed, Mar 24, 2021 at 7:08 AM Evan Galpin <ev...@gmail.com> wrote:

> Hi all, I’m looking for some expertise [image: :slightly_smiling_face:] I
> realize I may not be using things as intended, and I welcome any feedback.
> I’m using the Java SDK, v2.28.0 and using both the Direct Runner (for
> development) and Dataflow Runner (for production). My pipeline is a
> streaming pipeline with Google PubSub as the source.
>
> TL;DR I’d like to be able to maintain a grouping of entities within a
> single PCollection element, but parallelize the fetching of those entities
> from Google Cloud Storage (GCS). PCollection<Iterable<String>> -->
> PCollection<Iterable<String>> where the starting PCollection is an
> Iterable of file paths and the resulting PCollection is Iterable of file
> contents. Alternatively, PCollection<String> -->
> PCollection<Iterable<String>> would also work and perhaps even be
> preferable, where the starting PCollection is a glob pattern, and the
> resulting PCollection is an iterable of file contents which matched the
> glob.
>
> My use-case is that at a point in my pipeline I have as input
> PCollection<String>. Each element of the PCollection is a GCS filepath
> glob pattern. It’s important that files which match the glob be grouped
> together because the content of the files–once *all* files in a group are
> read–need to be grouped downstream in the pipeline. I originally tried using
> FileIO.matchAll  and a subsequently GroupByKey . However, the matchAll,
> window, and GroupByKey combination lacked any guarantee that all files
> matching the glob would be read and in the same window before performing
> the GroupByKey transform. It’s possible to achieve the desired results if
> a large WindowFn is applied, but it’s still probabilistic rather than a
> guarantee that all files will be read before grouping. It’s also the main
> goal of my pipeline to maintain the lowest possible latency.
>
> So my next, and currently operational, plan was to use an AsyncHttpClient
> to fan out fetching file contents via GCS HTTP API. I feel like this goes
> against the grain in Beam and is likely quite suboptimal in terms of
> parallelization.
>
> So I’ve started investigating SplittableDoFn . My current plan is to
> allow splitting such that each entity in the input Iterable (i.e. each
> matched file from the glob pattern) could be processed separately. The
> challenge I’ve encountered is: how do I go about grouping/gathering the
> split invocations back into a single output value in my DoFn? I’ve tried
> using stateful processing and using a BagState to collect file contents
> along the way, but I realized part way along that the ProcessElement method
> of a splittable DoFn may only accept ProcessContext and Restriction tuples,
> and no other args therefore no StateId args referring to a StateSpec.
>
> I noticed in the FilePatternWatcher example in the official SDF proposal
> doc[1] that a custom tracker was created wherein FilePath Objects kept in
> a set and presumably added to the set via tryClaim. This seems as though
> it could work for my use-case, but I don’t see/understand how to go about
> implementing a @SplitRestriction method using a custom RestrictionTracker.
>
> I would be very appreciative if anyone were able to offer advice. I have
> no preference for any particular solution, only that I want to achieve the
> ability to maintain a grouping of entities within a single PCollection
> element, but parallelize the fetching of those entities from Google Cloud
> Storage (GCS).
>
> Thanks!
> Evan
>
> [1] (
> https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit#heading=h.19qhdetat7d9
> )
>