You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pawel <pa...@o2.pl> on 2022/03/17 06:12:46 UTC

Processing multiple files

Hi,  I&#39;m new to Apache Beam. I&#39;m going to read and filter data from CommonCrawl. Recent CommonCrawl dump contains 72000 files which are mentioned in  commoncrawl.s3.amazonaws.com https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2022-05/wat.paths.gz . I want to build a Beam pipeline to read all the files. Unfortunately I am stuck on parallelizing this read on top of GCP Dataflow. My code is very simply but it seems I&#39;m blocked to just one worker on a step which is responsible for passing files to be fetched from AWS. I thought that this is maybe because Beam tries to split my work into Bundles but because there are only 72000 small strings (links/urls) the Bundle is so small that it goes to a single worker. Any ideas how I can overcome this problem? Thanks in advance. I will appreciate all the help.     type extractFn struct {  }   func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) {    emit(line)  }   type fetchFileFn struct {  }   func (f *fetchFileFn) ProcessElement(ctx context.Context, url string, emit func(string, string)) {    if url == &#34;&#34; {      return    }     fullURL := &#34; commoncrawl.s3.amazonaws.com https://commoncrawl.s3.amazonaws.com/ &#34; + strings.TrimSpace(url)    buf, err := gzipio.ReadHTTP(ctx, fullURL)    warcCounter.Inc(ctx, 1)    if err == nil {      for {        record, err := warc.ReadWARCRecord(buf)        if err != nil {          warcCounter.Dec(ctx, 1)          return       }       warcLinksCounter.Inc(ctx, 1)       emit(record.URL, record.Body)      }    }    warcCounter.Dec(ctx, 1)  }    func main() {    p := beam.NewPipeline()    s := p.Root()    urls := gzipio.ReadHTTPBeam(s, *input)    wetSrc := beam.ParDo(s, &amp;extractFn{}, urls)    _ := beam.ParDo(s, &amp;fetchFileFn{}, wetSrc)    beamx.Run(context.Background(), p)  }    --  Paweł Róg

Re: Processing multiple files

Posted by Luke Cwik <lc...@google.com>.
Cham, Kerry and Jack are on point here.

Reshuffle materializes the whole PCollection and redistributes across
multiple workers. This is typically expensive and splittable DoFn is a
better fit when data sizes are large but reshuffle is a great solution when
data sizes are small since the engineering cost of implementing a
splittable DoFn is higher.

Flink only supports initial splitting and doesn't support dynamic work
rebalancing. To my knowledge Dataflow is the only one who does this.

On Tue, Mar 22, 2022 at 8:29 AM Janek Bevendorff <
janek.bevendorff@uni-weimar.de> wrote:

> Side note: The Python classes I linked implement SDFs, which still don't
> work on Flink, hence they still need a shuffle despite being splittable.
> Would be interesting to see if it works on Dataflow (just the splitting,
> without the explicit shuffle), but I cannot test it myself. So if anyone
> wants to give it a try, I'd be interested in the results.
>
> Janek
>
> On 22/03/2022 16:19, Jack McCluskey wrote:
> > My gut instinct is that Cham's reshuffle suggestion is going to be the
> > better solution in this case since the input bundle is rather small,
> > but yes an SDF has some utility here if you wanted to go the extra
> > mile and let the runner split up the bundle. I don't have enough SDF
> > experience to say whether or not that extra effort would parallelize
> > the reads on the runner side though.
>

Re: Processing multiple files

Posted by Janek Bevendorff <ja...@uni-weimar.de>.
Side note: The Python classes I linked implement SDFs, which still don't 
work on Flink, hence they still need a shuffle despite being splittable. 
Would be interesting to see if it works on Dataflow (just the splitting, 
without the explicit shuffle), but I cannot test it myself. So if anyone 
wants to give it a try, I'd be interested in the results.

Janek

On 22/03/2022 16:19, Jack McCluskey wrote:
> My gut instinct is that Cham's reshuffle suggestion is going to be the 
> better solution in this case since the input bundle is rather small, 
> but yes an SDF has some utility here if you wanted to go the extra 
> mile and let the runner split up the bundle. I don't have enough SDF 
> experience to say whether or not that extra effort would parallelize 
> the reads on the runner side though.

Re: Processing multiple files

Posted by Jack McCluskey <jr...@google.com>.
My gut instinct is that Cham's reshuffle suggestion is going to be the
better solution in this case since the input bundle is rather small, but
yes an SDF has some utility here if you wanted to go the extra mile and let
the runner split up the bundle. I don't have enough SDF experience to say
whether or not that extra effort would parallelize the reads on the runner
side though.

On Tue, Mar 22, 2022 at 9:11 AM Kerry Donny-Clark <ke...@google.com>
wrote:

> +Jack McCluskey <jr...@google.com>
> Is this a good use case for splittable doFn?
>
> Kerry
>
> On Tue, Mar 22, 2022, 5:21 AM Janek Bevendorff <
> janek.bevendorff@uni-weimar.de> wrote:
>
>>
>> Probably you just need to break fusion by introducing a Reshuffle [1]
>> transform after the ReadHTTPBeam step. The way the pipeline is structured
>> currently, Dataflow will fuse everything into a single step and will run in
>> a single worker.
>>
>> That's most likely the issue. I had the same thing on Flink and assumed
>> that Dataflow would support auto workload redistribution. So this seems to
>> be a general restriction and not specific to Flink after all? This is
>> definitely something that needs to be fixed urgently.
>>
>> I solved it on our end by writing custom input transforms for our
>> processing library. It's in Python, not Go, but perhaps it's still useful
>> to you. Here's the one for reading WARC files:
>> https://github.com/chatnoir-eu/chatnoir-resiliparse/blob/develop/resiliparse/resiliparse/beam/warcio.py
>>
>> General documentation:
>> https://resiliparse.chatnoir.eu/en/stable/api/beam/warcio.html
>>
>> Janek
>>
>

Re: Processing multiple files

Posted by Kerry Donny-Clark <ke...@google.com>.
+Jack McCluskey <jr...@google.com>
Is this a good use case for splittable doFn?

Kerry

On Tue, Mar 22, 2022, 5:21 AM Janek Bevendorff <
janek.bevendorff@uni-weimar.de> wrote:

>
> Probably you just need to break fusion by introducing a Reshuffle [1]
> transform after the ReadHTTPBeam step. The way the pipeline is structured
> currently, Dataflow will fuse everything into a single step and will run in
> a single worker.
>
> That's most likely the issue. I had the same thing on Flink and assumed
> that Dataflow would support auto workload redistribution. So this seems to
> be a general restriction and not specific to Flink after all? This is
> definitely something that needs to be fixed urgently.
>
> I solved it on our end by writing custom input transforms for our
> processing library. It's in Python, not Go, but perhaps it's still useful
> to you. Here's the one for reading WARC files:
> https://github.com/chatnoir-eu/chatnoir-resiliparse/blob/develop/resiliparse/resiliparse/beam/warcio.py
>
> General documentation:
> https://resiliparse.chatnoir.eu/en/stable/api/beam/warcio.html
>
> Janek
>

Re: Processing multiple files

Posted by Janek Bevendorff <ja...@uni-weimar.de>.
> Probably you just need to break fusion by introducing a Reshuffle [1] 
> transform after the ReadHTTPBeam step. The way the pipeline is 
> structured currently, Dataflow will fuse everything into a single step 
> and will run in a single worker.
>
That's most likely the issue. I had the same thing on Flink and assumed 
that Dataflow would support auto workload redistribution. So this seems 
to be a general restriction and not specific to Flink after all? This is 
definitely something that needs to be fixed urgently.

I solved it on our end by writing custom input transforms for our 
processing library. It's in Python, not Go, but perhaps it's still 
useful to you. Here's the one for reading WARC files: 
https://github.com/chatnoir-eu/chatnoir-resiliparse/blob/develop/resiliparse/resiliparse/beam/warcio.py

General documentation: 
https://resiliparse.chatnoir.eu/en/stable/api/beam/warcio.html

Janek

Re: Processing multiple files

Posted by Chamikara Jayalath <ch...@google.com>.
On Wed, Mar 16, 2022 at 11:12 PM Pawel <pa...@o2.pl> wrote:

> Hi,
> I'm new to Apache Beam. I'm going to read and filter data from
> CommonCrawl. Recent CommonCrawl dump contains 72000 files which are
> mentioned in
> https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2022-05/wat.paths.gz.
> I want to build a Beam pipeline to read all the files. Unfortunately I am
> stuck on parallelizing this read on top of GCP Dataflow. My code is very
> simply but it seems I'm blocked to just one worker on a step which is
> responsible for passing files to be fetched from AWS. I thought that this
> is maybe because Beam tries to split my work into Bundles but because there
> are only 72000 small strings (links/urls) the Bundle is so small that it
> goes to a single worker. Any ideas how I can overcome this problem? Thanks
> in advance. I will appreciate all the help.
>
>
>
> type extractFn struct {
> }
>
> func (f *extractFn) ProcessElement(ctx context.Context, line string, emit
> func(string)) {
>   emit(line)
> }
>
> type fetchFileFn struct {
> }
>
> func (f *fetchFileFn) ProcessElement(ctx context.Context, url string, emit
> func(string, string)) {
>   if url == "" {
>     return
>   }
>
>   fullURL := "https://commoncrawl.s3.amazonaws.com/" +
> strings.TrimSpace(url)
>   buf, err := gzipio.ReadHTTP(ctx, fullURL)
>   warcCounter.Inc(ctx, 1)
>   if err == nil {
>     for {
>       record, err := warc.ReadWARCRecord(buf)
>       if err != nil {
>         warcCounter.Dec(ctx, 1)
>         return
>      }
>      warcLinksCounter.Inc(ctx, 1)
>      emit(record.URL, record.Body)
>     }
>   }
>   warcCounter.Dec(ctx, 1)
> }
>
>
> func main() {
>   p := beam.NewPipeline()
>   s := p.Root()
>   urls := gzipio.ReadHTTPBeam(s, *input)
>   wetSrc := beam.ParDo(s, &extractFn{}, urls)
>   _ := beam.ParDo(s, &fetchFileFn{}, wetSrc)
>   beamx.Run(context.Background(), p)
> }
>

Probably you just need to break fusion by introducing a Reshuffle [1]
transform after the ReadHTTPBeam step. The way the pipeline is structured
currently, Dataflow will fuse everything into a single step and will run in
a single worker.

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/ab36c1c8c551d7a806b220ae7ba8d18192b2fd2a/sdks/go/pkg/beam/gbk.go#L122


>
>
> --
> Paweł Róg
>