You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Zoltán Novák <zo...@ericsson.com> on 2018/07/25 12:44:39 UTC
Action upon flushing an output file
Hi,
I'd like some help!
- I have a pipeline in Beam using Spark runner
- with windowing based on event-time
- writing one file per window to HDFS using TextIO
/TextIO functionalities I'm using and would like to keep: to(FileNamepolicy)/withWindowedWrites()/withTempDirectory(...)/
I'd like to do some action upon finishing a given output file:
I mean the time when it is completely finished and flushed, and no more appends to it (I'm using withAllowedLateness(Duration).accumulatingFiredPanes()) - imagine having a log message "File xyz is flushed", or something similar
How can I do this?
/My trials so far:
Is implementing and adding an additional Sink good for this? (with my flush() printing the log message)
- It seems that I can't add an additional Sink to TextIO...
- Can I use FileIO.via()? But then, how can I keep my existing TextIO configuration, FileIO does not seem to have the same methods...
- Also I did not find a way to add my TextIO configuration upon "via(TextIO.sink())", is there a way to do this?
Or I am searching completely in the wrong direction.../
BR, Zoltan
Action upon flushing an output file
Posted by Zoltán Novák <zo...@ericsson.com>.
Hi,
I'd like some help!
- I have a pipeline in Beam using Spark runner
- with windowing based on event-time
- writing one file per window to HDFS using TextIO
/TextIO functionalities I'm using and would like to keep: to(FileNamepolicy)/withWindowedWrites()/withTempDirectory(...)/
I'd like to do some action upon finishing a given output file:
I mean the time when it is completely finished and flushed, and no more appends to it (I'm using withAllowedLateness(Duration).accumulatingFiredPanes()) - imagine having a log message "File xyz is flushed", or something similar
How can I do this?
/My trials so far:
Is implementing and adding an additional Sink good for this? (with my flush() printing the log message)
- It seems that I can't add an additional Sink to TextIO...
- Can I use FileIO.via()? But then, how can I keep my existing TextIO configuration, FileIO does not seem to have the same methods...
- Also I did not find a way to add my TextIO configuration upon "via(TextIO.sink())", is there a way to do this?
Or I am searching completely in the wrong direction.../
BR, Zoltan