You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by David Gogokhiya <da...@yelp.com> on 2020/08/28 01:44:34 UTC
Beam Memory Issue Followup
Sorry for starting a new thread. I couldn't add a response to the existing
thread at [1] for some reason. I have a very naive question related to
finding a temporary fix to the memory issue raised in [1]. I know Jan
suggested to use 2 successive fixed overlapping windows with offset as a
temporary solution to dedup the events. However, I am wondering whether
using a single fixed window of length let's say 1 day followed by a
deduplicate function is a good alternative? I assume that at the end of the
window all the timers will be cleared which will result in missing some of
the duplicates but I am ok with that.
My pipeline looks something like the following in this case:
# p: beam.Pipeline
(
p | "ReadData" >> FlinkYelpDatapipelineInput()
| "MapToTuple" >> beam.Map(lambda msg: (msg.id, msg))
| "Window" >> beam.WindowInto(
window.FixedWindows(1 * 24 * 60 * 60),
trigger=AfterCount(1),
accumulation_mode=AccumulationMode.DISCARDING,
)
# DeduplicatePerKey function [2]
| "Deduplicate" >> DeduplicatePerKey(
processing_time_duration=30 * 60
)
| "UnmapTuple" >> beam.Map(lambda msg: msg[1])
# Very simple custom ParDo function
| "SubmitTask" >> beam.ParDo(ProcessEventFn())
)
It seems to be working when I tested it but I wanted to double check
especially considering the the following statement taken from the Beam
documentation [3]: "If you set a windowing function using the Window
transform, each element is assigned to a window, but the windows are not
considered until GroupByKey or Combine aggregates across a window and key. "
[1]
https://lists.apache.org/thread.html/rae268806035688b77646195505e5b7a56568a38feb1e52d6341feedd%40%3Cdev.beam.apache.org%3E
[2]
https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey
[3]
https://beam.apache.org/documentation/programming-guide/#windowing