You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Stephan Kotze <st...@gmail.com> on 2022/04/25 14:29:55 UTC

GlobalWindows and default triggers

Hi

We are trying to get a clear understanding of how Beam Behaves when
streaming, but without explicit windowing applied.

The Use Case ->
* A Message arrives on PubSub that contains the uri's of a set of files in
GCS that must be processed together.
* The files are read from GCS using FileIO (we need to do some initial
parsing of each file)
* (DoFn) We emit text elements based on Regex
* (DoFn) The text elements are further parsed and now turned into KVs
* We perform Joins / CoGroupByKey / GroupByKeys on the KVs
* (DoFn) Create elements to be written to BigQuery
* Store the elements into BQ (different rows/ tables etc.)


It can almost be seen as a micro batch being executed per incoming message,
they are not massive and the delay in starting up Dataflow batch JSONs for
these (1 to 2 minutes) introduces unnecessary latency. (We want to process
these "microbatches" in a few seconds at most).
There is no business concept of time based windowing. The window really is
per input file from PubSub.



Where we are getting a bit unstuck is that writing this pipeline, with no
triggers at all it works as expected (well we've not tested edge cases
etc.) but this is unexpected because: of the documentation stating
"If you are using unbounded PCollections, you must use either non-global
windowing or an aggregation trigger in order to perform a GroupByKey or
CoGroupByKey. This is because a bounded GroupByKey or CoGroupByKey must
wait for all the data with a certain key to be collected, but with
unbounded collections, the data is unlimited. Windowing and/or triggers
allow grouping to operate on logical, finite bundles of data within the
unbounded data streams."

Which to my mind indicates that a GroupByKey in a streaming pipeline with
no Explicit Triggers in a global window (we aren't assigning any windows
explicitly either) should error. But it does not.

So the questions then arise:
* Why can we do the GroupByKeys/Joins etc. without triggers? (Why is it not
erroring)
* When Performing joins / GroupByKeys etc. what will be included? will it
only be elements from the Source message -> messages read etc? or can
elements from other Source messages be included (if they have the same
keys).
* Essentially, if we do nothing (or if we trigger once per input message,
will only elements from that trigger
* Would a trigger such as AfterPane.elementCountAtLeast(1), potentially
combine multiple elements from different source messages (impulsed), and
this can potentially be managed by adding something to our Keys, that is
unique to every source message?


If someone could help clarifying the boundaries here it would be super
helpful

Thanks so much,
Stephan

PS.

Locally I'm cheating with:
        PCollection<String> filepatterns =
p.apply(GenerateSequence.from(0).to(9).withRate(1,
Duration.standardSeconds(1)))

.apply(MapElements.into(TypeDescriptors.strings()).via(input -> {
                    return
String.format("/c:/Users/Stephan/dev/word-count-beam/small-%s.txt", input);
                }));


        PCollection<KV<String, String>> kvCollection =
filepatterns.apply(FileIO.matchAll()
                        .withEmptyMatchTreatment(DISALLOW))
                .apply(FileIO.readMatches())


Which triggers the reading of files via a sequence and not PubSub, so we
aren't introducing any watermarking/time dimensions