You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Zoltán Novák <zo...@ericsson.com> on 2018/07/25 12:44:39 UTC

Action upon flushing an output file

Hi,

I'd like some help!

 - I have a pipeline in Beam using Spark runner
 - with windowing based on event-time
 - writing one file per window to HDFS using TextIO
      /TextIO functionalities I'm using and would like to keep: to(FileNamepolicy)/withWindowedWrites()/withTempDirectory(...)/

I'd like to do some action upon finishing a given output file:

I mean the time when it is completely finished and flushed, and no more appends to it (I'm using withAllowedLateness(Duration).accumulatingFiredPanes()) - imagine  having a log message "File xyz is flushed", or something similar

How can I do this?

/My trials so far:
Is implementing and adding an additional Sink good for this? (with my flush() printing the log message)
     - It seems that I can't add an additional Sink to TextIO...
     - Can I use FileIO.via()? But then, how can I keep  my existing TextIO configuration, FileIO does not seem to have the same methods...
     - Also I did not find a way to add my TextIO configuration upon "via(TextIO.sink())", is there a way to do this?
Or I am searching completely in the wrong direction.../

BR, Zoltan


Action upon flushing an output file

Posted by Zoltán Novák <zo...@ericsson.com>.
Hi,

I'd like some help!

 - I have a pipeline in Beam using Spark runner
 - with windowing based on event-time
 - writing one file per window to HDFS using TextIO
      /TextIO functionalities I'm using and would like to keep: to(FileNamepolicy)/withWindowedWrites()/withTempDirectory(...)/

I'd like to do some action upon finishing a given output file:

I mean the time when it is completely finished and flushed, and no more appends to it (I'm using withAllowedLateness(Duration).accumulatingFiredPanes()) - imagine  having a log message "File xyz is flushed", or something similar

How can I do this?

/My trials so far:
Is implementing and adding an additional Sink good for this? (with my flush() printing the log message)
     - It seems that I can't add an additional Sink to TextIO...
     - Can I use FileIO.via()? But then, how can I keep  my existing TextIO configuration, FileIO does not seem to have the same methods...
     - Also I did not find a way to add my TextIO configuration upon "via(TextIO.sink())", is there a way to do this?
Or I am searching completely in the wrong direction.../

BR, Zoltan