You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Almeida, Julius" <Ju...@intuit.com> on 2020/07/10 20:51:11 UTC

FileIO write to new folder every hour.

Hi Team,

I am trying to write files using FileIO, but currently all files fall under same folder.

I need to write to new folder every hour,
eg.: /output/hour-01/files.*.    -> events coming in at hour 1
       /output/hour-02/files.*.    -> events coming in at hour 2

My Code :

parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
        FixedWindows.of(Duration.standardMinutes(10)))
        .triggering(AfterWatermark.pastEndOfWindow())
        .discardingFiredPanes()
        .withAllowedLateness(Duration.standardMinutes(0)))
        .apply(Distinct.create())

        .apply(FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(getOutput_schema()))
                .to(outputPath + "hour=" + new DateTime().toString("HH") + "/").withNumShards(1)
                .withSuffix(".snappy.parquet"));


Thanks,
Julius




Re: FileIO write to new folder every hour.

Posted by Chamikara Jayalath <ch...@google.com>.
On Tue, Jul 14, 2020 at 10:38 PM Almeida, Julius <Ju...@intuit.com>
wrote:

> Hi Team,
>
>
>
> Thanks Mohil, that seems to be working.
>
>
>
> I would also like to control the size of file I write to, is there a way
> to achieve this in beam.
>
>
>
> I am currently using this, but not sure if this works.
>
>
>
> pipeline.getOptions().as(S3Options.*class*).setS3UploadBufferSizeBytes(
> 134217728);
>

You cannot control the exact size of files since this depends on the way
elements are assigned to windows and bundles and the way runner splits the
source that precedes your sink. But you can control the number of shards
per window -
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1178
Note that this might have performance implications.

Thanks,
Cham


>
> Thanks,
>
> Julius
>
>
>
> *From: *Mohil Khare <mo...@prosimo.io>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Friday, July 10, 2020 at 3:43 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: FileIO write to new folder every hour.
>
>
>
> This email is from an external sender.
>
>
>
> BTW, if you want current hour based on your interval window you can also
> do:
>
>
>
> class DatePartitionedFileName implements FileIO.Write.FileNaming {
>
>
>
>     private static final DateTimeFormatter DATE_FORMAT =
> DateTimeFormat.forPattern(“HH”); —> some java function is there
>
>     DatePartitionedFileName(String subpath) {
>
>
>
>     }
>
>
>
>
>
>       @Override
>
>     public String getFilename(BoundedWindow window, PaneInfo pane, int
> numShards, int shardIndex, Compression compression) {
>
>         IntervalWindow intervalWindow = (IntervalWindow) window;
>
>         IntervalWindow intervalWindow = (IntervalWindow) window;
>
>         return String.format(
>
>             %s/file.parquet",
>
>             subpath,
>
>          *   DATE_FORMAT.print(intervalWindow.start()*);
>
>     }
>
> }
>
>
>
> Regards
>
> Mohil
>
>
>
> On Fri, Jul 10, 2020 at 3:39 PM Mohil Khare <mo...@prosimo.io> wrote:
>
> Hello Julius,
>
>
>
> Well I do something similar using FileIO.Write.FileNaming i,e,
> https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html
>
>
>
> You can do something like following:
>
> .apply(FileIO.<GenericRecord>*write*()
>                 .via(ParquetIO.*sink*(*getOutput_schema*()))
>                 .to(outputPath)
>
>                 .withNumShards(1)
>
>                                  *  .withNaming(new
> DatePartitionedFileName(subpath))*
>
>
>
> Where your DatePartitionedFileName will implete FileIO.Write.FileNaming
> and override its getFileName method i.e. something like following:
>
>
>
> class DatePartitionedFileName implements FileIO.Write.FileNaming {
>
>
>
>     DatePartitionedFileName(String subpath) {
>
>
>
>     }
>
>
>
>
>
>       @Override
>
>     public String getFilename(BoundedWindow window, PaneInfo pane, int
> numShards, int shardIndex, Compression compression) {
>
>         IntervalWindow intervalWindow = (IntervalWindow) window;
>
>         return String.format(
>
>             %s/file.parquet",
>
>             subpath,
>
>             some function like--> getCurrentHour());
>
>     }
>
> }
>
>
>
> Hope this helps.
>
>
>
> Regards
>
> Mohil
>
>
>
>
>
> On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius <Ju...@intuit.com>
> wrote:
>
> Hi Team,
>
>
>
> I am trying to write files using FileIO, but currently all files fall
> under same folder.
>
>
>
> I need to write to new folder every hour,
>
> eg.: /output/hour-01/files.*.    -> events coming in at hour 1
>
>        /output/hour-02/files.*.    -> events coming in at hour 2
>
>
>
> My Code :
>
>
>
> parquetRecord.apply(*"Batch Events"*, Window.<GenericRecord>*into*(
>         FixedWindows.*of*(Duration.*standardMinutes*(10)))
>         .triggering(AfterWatermark.*pastEndOfWindow*())
>         .discardingFiredPanes()
>         .withAllowedLateness(Duration.*standardMinutes*(0)))
>         .apply(Distinct.*create*())
>
>         .apply(FileIO.<GenericRecord>*write*()
>                 .via(ParquetIO.*sink*(*getOutput_schema*()))
>                 .to(outputPath + *"hour=" *+ *new *DateTime().toString(
> *"HH"*) + *"/"*).withNumShards(1)
>                 .withSuffix(*".snappy.parquet"*));
>
>
>
>
>
> Thanks,
>
> Julius
>
>
>
>
>
>
>
>

Re: FileIO write to new folder every hour.

Posted by "Almeida, Julius" <Ju...@intuit.com>.
Hi Team,

Thanks Mohil, that seems to be working.

I would also like to control the size of file I write to, is there a way to achieve this in beam.

I am currently using this, but not sure if this works.

pipeline.getOptions().as(S3Options.class).setS3UploadBufferSizeBytes(134217728);

Thanks,
Julius

From: Mohil Khare <mo...@prosimo.io>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Friday, July 10, 2020 at 3:43 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: FileIO write to new folder every hour.

This email is from an external sender.

BTW, if you want current hour based on your interval window you can also do:


class DatePartitionedFileName implements FileIO.Write.FileNaming {



    private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(“HH”); —> some java function is there

    DatePartitionedFileName(String subpath) {



    }





      @Override

    public String getFilename(BoundedWindow window, PaneInfo pane, int numShards, int shardIndex, Compression compression) {

        IntervalWindow intervalWindow = (IntervalWindow) window;

        IntervalWindow intervalWindow = (IntervalWindow) window;

        return String.format(

            %s/file.parquet",

            subpath,

            DATE_FORMAT.print(intervalWindow.start());

    }

}

Regards
Mohil

On Fri, Jul 10, 2020 at 3:39 PM Mohil Khare <mo...@prosimo.io>> wrote:
Hello Julius,

Well I do something similar using FileIO.Write.FileNaming i,e, https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html

You can do something like following:
.apply(FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(getOutput_schema()))
                .to(outputPath)
                .withNumShards(1)

                                   .withNaming(new DatePartitionedFileName(subpath))

Where your DatePartitionedFileName will implete FileIO.Write.FileNaming and override its getFileName method i.e. something like following:


class DatePartitionedFileName implements FileIO.Write.FileNaming {



    DatePartitionedFileName(String subpath) {



    }





      @Override

    public String getFilename(BoundedWindow window, PaneInfo pane, int numShards, int shardIndex, Compression compression) {

        IntervalWindow intervalWindow = (IntervalWindow) window;

        return String.format(

            %s/file.parquet",

            subpath,

            some function like--> getCurrentHour());

    }

}

Hope this helps.

Regards
Mohil


On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius <Ju...@intuit.com>> wrote:
Hi Team,

I am trying to write files using FileIO, but currently all files fall under same folder.

I need to write to new folder every hour,
eg.: /output/hour-01/files.*.    -> events coming in at hour 1
       /output/hour-02/files.*.    -> events coming in at hour 2

My Code :

parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
        FixedWindows.of(Duration.standardMinutes(10)))
        .triggering(AfterWatermark.pastEndOfWindow())
        .discardingFiredPanes()
        .withAllowedLateness(Duration.standardMinutes(0)))
        .apply(Distinct.create())

        .apply(FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(getOutput_schema()))
                .to(outputPath + "hour=" + new DateTime().toString("HH") + "/").withNumShards(1)
                .withSuffix(".snappy.parquet"));


Thanks,
Julius




Re: FileIO write to new folder every hour.

Posted by "Almeida, Julius" <Ju...@intuit.com>.
Thanks Mohil. Will give it a try.

From: Mohil Khare <mo...@prosimo.io>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Friday, July 10, 2020 at 3:43 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: FileIO write to new folder every hour.

This email is from an external sender.

BTW, if you want current hour based on your interval window you can also do:


class DatePartitionedFileName implements FileIO.Write.FileNaming {



    private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(“HH”); —> some java function is there

    DatePartitionedFileName(String subpath) {



    }





      @Override

    public String getFilename(BoundedWindow window, PaneInfo pane, int numShards, int shardIndex, Compression compression) {

        IntervalWindow intervalWindow = (IntervalWindow) window;

        IntervalWindow intervalWindow = (IntervalWindow) window;

        return String.format(

            %s/file.parquet",

            subpath,

            DATE_FORMAT.print(intervalWindow.start());

    }

}

Regards
Mohil

On Fri, Jul 10, 2020 at 3:39 PM Mohil Khare <mo...@prosimo.io>> wrote:
Hello Julius,

Well I do something similar using FileIO.Write.FileNaming i,e, https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html

You can do something like following:
.apply(FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(getOutput_schema()))
                .to(outputPath)
                .withNumShards(1)

                                   .withNaming(new DatePartitionedFileName(subpath))

Where your DatePartitionedFileName will implete FileIO.Write.FileNaming and override its getFileName method i.e. something like following:


class DatePartitionedFileName implements FileIO.Write.FileNaming {



    DatePartitionedFileName(String subpath) {



    }





      @Override

    public String getFilename(BoundedWindow window, PaneInfo pane, int numShards, int shardIndex, Compression compression) {

        IntervalWindow intervalWindow = (IntervalWindow) window;

        return String.format(

            %s/file.parquet",

            subpath,

            some function like--> getCurrentHour());

    }

}

Hope this helps.

Regards
Mohil


On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius <Ju...@intuit.com>> wrote:
Hi Team,

I am trying to write files using FileIO, but currently all files fall under same folder.

I need to write to new folder every hour,
eg.: /output/hour-01/files.*.    -> events coming in at hour 1
       /output/hour-02/files.*.    -> events coming in at hour 2

My Code :

parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
        FixedWindows.of(Duration.standardMinutes(10)))
        .triggering(AfterWatermark.pastEndOfWindow())
        .discardingFiredPanes()
        .withAllowedLateness(Duration.standardMinutes(0)))
        .apply(Distinct.create())

        .apply(FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(getOutput_schema()))
                .to(outputPath + "hour=" + new DateTime().toString("HH") + "/").withNumShards(1)
                .withSuffix(".snappy.parquet"));


Thanks,
Julius




Re: FileIO write to new folder every hour.

Posted by Mohil Khare <mo...@prosimo.io>.
BTW, if you want current hour based on your interval window you can also do:

class DatePartitionedFileName implements FileIO.Write.FileNaming {


    private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormat.forPattern(“HH”); —> some java function is there

    DatePartitionedFileName(String subpath) {


    }



      @Override

    public String getFilename(BoundedWindow window, PaneInfo pane, int
numShards, int shardIndex, Compression compression) {

        IntervalWindow intervalWindow = (IntervalWindow) window;

        IntervalWindow intervalWindow = (IntervalWindow) window;

        return String.format(

            %s/file.parquet",

            subpath,

         *   DATE_FORMAT.print(intervalWindow.start()*);

    }

}

Regards
Mohil

On Fri, Jul 10, 2020 at 3:39 PM Mohil Khare <mo...@prosimo.io> wrote:

> Hello Julius,
>
> Well I do something similar using FileIO.Write.FileNaming i,e,
> https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html
>
> You can do something like following:
> .apply(FileIO.<GenericRecord>*write*()
>                 .via(ParquetIO.*sink*(*getOutput_schema*()))
>                 .to(outputPath)
>                 .withNumShards(1)
>
>                                    *.withNaming(new
> DatePartitionedFileName(subpath))*
>
> Where your DatePartitionedFileName will implete FileIO.Write.FileNaming
> and override its getFileName method i.e. something like following:
>
> class DatePartitionedFileName implements FileIO.Write.FileNaming {
>
>
>     DatePartitionedFileName(String subpath) {
>
>
>     }
>
>
>
>       @Override
>
>     public String getFilename(BoundedWindow window, PaneInfo pane, int
> numShards, int shardIndex, Compression compression) {
>
>         IntervalWindow intervalWindow = (IntervalWindow) window;
>
>         return String.format(
>
>             %s/file.parquet",
>
>             subpath,
>
>             some function like--> getCurrentHour());
>
>     }
>
> }
>
> Hope this helps.
>
> Regards
> Mohil
>
>
> On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius <Ju...@intuit.com>
> wrote:
>
>> Hi Team,
>>
>>
>>
>> I am trying to write files using FileIO, but currently all files fall
>> under same folder.
>>
>>
>>
>> I need to write to new folder every hour,
>>
>> eg.: /output/hour-01/files.*.    -> events coming in at hour 1
>>
>>        /output/hour-02/files.*.    -> events coming in at hour 2
>>
>>
>>
>> My Code :
>>
>>
>>
>> parquetRecord.apply(*"Batch Events"*, Window.<GenericRecord>*into*(
>>         FixedWindows.*of*(Duration.*standardMinutes*(10)))
>>         .triggering(AfterWatermark.*pastEndOfWindow*())
>>         .discardingFiredPanes()
>>         .withAllowedLateness(Duration.*standardMinutes*(0)))
>>         .apply(Distinct.*create*())
>>
>>         .apply(FileIO.<GenericRecord>*write*()
>>                 .via(ParquetIO.*sink*(*getOutput_schema*()))
>>                 .to(outputPath + *"hour=" *+ *new *DateTime().toString(
>> *"HH"*) + *"/"*).withNumShards(1)
>>                 .withSuffix(*".snappy.parquet"*));
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Julius
>>
>>
>>
>>
>>
>>
>>
>

Re: FileIO write to new folder every hour.

Posted by Mohil Khare <mo...@prosimo.io>.
Hello Julius,

Well I do something similar using FileIO.Write.FileNaming i,e,
https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html

You can do something like following:
.apply(FileIO.<GenericRecord>*write*()
                .via(ParquetIO.*sink*(*getOutput_schema*()))
                .to(outputPath)
                .withNumShards(1)

                                   *.withNaming(new
DatePartitionedFileName(subpath))*

Where your DatePartitionedFileName will implete FileIO.Write.FileNaming and
override its getFileName method i.e. something like following:

class DatePartitionedFileName implements FileIO.Write.FileNaming {


    DatePartitionedFileName(String subpath) {


    }



      @Override

    public String getFilename(BoundedWindow window, PaneInfo pane, int
numShards, int shardIndex, Compression compression) {

        IntervalWindow intervalWindow = (IntervalWindow) window;

        return String.format(

            %s/file.parquet",

            subpath,

            some function like--> getCurrentHour());

    }

}

Hope this helps.

Regards
Mohil


On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius <Ju...@intuit.com>
wrote:

> Hi Team,
>
>
>
> I am trying to write files using FileIO, but currently all files fall
> under same folder.
>
>
>
> I need to write to new folder every hour,
>
> eg.: /output/hour-01/files.*.    -> events coming in at hour 1
>
>        /output/hour-02/files.*.    -> events coming in at hour 2
>
>
>
> My Code :
>
>
>
> parquetRecord.apply(*"Batch Events"*, Window.<GenericRecord>*into*(
>         FixedWindows.*of*(Duration.*standardMinutes*(10)))
>         .triggering(AfterWatermark.*pastEndOfWindow*())
>         .discardingFiredPanes()
>         .withAllowedLateness(Duration.*standardMinutes*(0)))
>         .apply(Distinct.*create*())
>
>         .apply(FileIO.<GenericRecord>*write*()
>                 .via(ParquetIO.*sink*(*getOutput_schema*()))
>                 .to(outputPath + *"hour=" *+ *new *DateTime().toString(
> *"HH"*) + *"/"*).withNumShards(1)
>                 .withSuffix(*".snappy.parquet"*));
>
>
>
>
>
> Thanks,
>
> Julius
>
>
>
>
>
>
>