You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by David Gogokhiya <da...@yelp.com> on 2020/08/28 01:44:34 UTC

Beam Memory Issue Followup

Sorry for starting a new thread. I couldn't add a response to the existing
thread at [1] for some reason. I have a very naive question related to
finding a temporary fix to the memory issue raised in [1]. I know Jan
suggested to use 2 successive fixed overlapping windows with offset as a
temporary solution to dedup the events. However, I am wondering whether
using a single fixed window of length let's say 1 day followed by a
deduplicate function is a good alternative? I assume that at the end of the
window all the timers will be cleared which will result in missing some of
the duplicates but I am ok with that.

My pipeline looks something like the following in this case:
# p: beam.Pipeline
(
    p    | "ReadData" >> FlinkYelpDatapipelineInput()
    | "MapToTuple" >> beam.Map(lambda msg: (msg.id, msg))
    | "Window" >> beam.WindowInto(
        window.FixedWindows(1 * 24 * 60 * 60),
        trigger=AfterCount(1),
        accumulation_mode=AccumulationMode.DISCARDING,
    )
    # DeduplicatePerKey function [2]
    | "Deduplicate" >> DeduplicatePerKey(
        processing_time_duration=30 * 60
    )
    | "UnmapTuple" >> beam.Map(lambda msg: msg[1])
    # Very simple custom ParDo function
    | "SubmitTask" >> beam.ParDo(ProcessEventFn())
)

It seems to be working when I tested it but I wanted to double check
especially considering the the following statement taken from the Beam
documentation [3]: "If you set a windowing function using the Window
transform, each element is assigned to a window, but the windows are not
considered until GroupByKey or Combine aggregates across a window and key. "

[1]
https://lists.apache.org/thread.html/rae268806035688b77646195505e5b7a56568a38feb1e52d6341feedd%40%3Cdev.beam.apache.org%3E

[2]

https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey

[3]
https://beam.apache.org/documentation/programming-guide/#windowing