You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ankur Chauhan <an...@malloc64.com> on 2018/04/26 00:02:24 UTC

FileIO - How to get directory per hour

Hi all,

I am working through a use case where I would like to write events from
PubSubIO to GCS. The events are protobuf events so I used a custom
FileIO.Sink which is defined as:

    // delimited writer
    static class ProtobufFileIOSink<T extends Message> implements
FileIO.Sink<T> {
        @Nullable private transient CodedOutputStream cos;

        @Override
        public void open(WritableByteChannel channel) {
            this.cos =
CodedOutputStream.newInstance(Channels.newOutputStream(channel));
        }

        @Override
        public void write(Message element) throws IOException {
            if (element == null) {
                return;
            }

            if (cos == null) {
                return;
            }

            element.writeTo(cos);
        }

        @Override
        public void flush() throws IOException {
            if (cos != null) {
                cos.flush();
            }
        }
    }

The pipeline that I am using is this:

p.apply(PubsubIO.readProtos(Billing.BillingMeasurement.class).fromSubscription(input
+ "-billing.proto").withIdAttribute("event_id"))
 .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L))))
 .apply(FileIO.<Billing.BillingMeasurement>write().via(new
ProtobufFileIOSink<>()).withCompression(Compression.GZIP).to(output +
"/billing.proto/").withSuffix(".pb"));

In order to make sifting through the output easier, I would like to have
the resulting file to be organized by year/month/day/hour so the hours
looks like:

gs://<bucket-name>/<prefixdir>/<year>/<month>/<day>/<hour>/[filename.pb.gz]

I tired looking through FileIO.writeDynamic() and FileNaming but I am not
sure if that is the correct place. Is there an example or another
implementation that someone can point me to that would be a good place to
look at.

— Ankur Chauhan

Re: FileIO - How to get directory per hour

Posted by Eugene Kirpichov <ki...@google.com>.
Hi Ankur,
FileIO.write() with FileNaming is the right combination. It should be
something like:
.apply(FileIO.write().via(...).to("gs://bucket-name/prefixdir").withNumShards(1).withNaming((window,
pane, numShardsIgnored, shardIndexIgnored, compressionIgnored) ->
...construct a string like "$year/$month/$day/$hour/filename-$pane.pb"...))

Please note that you may have data for a given date/time arrive multiple
times. FileIO can not append to existing files (and files on GCS can not be
appended to anyway). That means that your filename MUST include the pane
info (hence the "-pane" in my example above).

On Wed, Apr 25, 2018 at 5:02 PM Ankur Chauhan <an...@malloc64.com> wrote:

> Hi all,
>
> I am working through a use case where I would like to write events from
> PubSubIO to GCS. The events are protobuf events so I used a custom
> FileIO.Sink which is defined as:
>
>     // delimited writer
>     static class ProtobufFileIOSink<T extends Message> implements FileIO.Sink<T> {
>         @Nullable private transient CodedOutputStream cos;
>
>         @Override
>         public void open(WritableByteChannel channel) {
>             this.cos = CodedOutputStream.newInstance(Channels.newOutputStream(channel));
>         }
>
>         @Override
>         public void write(Message element) throws IOException {
>             if (element == null) {
>                 return;
>             }
>
>             if (cos == null) {
>                 return;
>             }
>
>             element.writeTo(cos);
>         }
>
>         @Override
>         public void flush() throws IOException {
>             if (cos != null) {
>                 cos.flush();
>             }
>         }
>     }
>
> The pipeline that I am using is this:
>
> p.apply(PubsubIO.readProtos(Billing.BillingMeasurement.class).fromSubscription(input + "-billing.proto").withIdAttribute("event_id"))
>  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L))))
>  .apply(FileIO.<Billing.BillingMeasurement>write().via(new ProtobufFileIOSink<>()).withCompression(Compression.GZIP).to(output + "/billing.proto/").withSuffix(".pb"));
>
> In order to make sifting through the output easier, I would like to have
> the resulting file to be organized by year/month/day/hour so the hours
> looks like:
>
> gs://<bucket-name>/<prefixdir>/<year>/<month>/<day>/<hour>/[filename.pb.gz]
>
> I tired looking through FileIO.writeDynamic() and FileNaming but I am not
> sure if that is the correct place. Is there an example or another
> implementation that someone can point me to that would be a good place to
> look at.
>
> — Ankur Chauhan
>