You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Borisa Zivkovic <bo...@gmail.com> on 2017/05/11 15:01:31 UTC

TextIO and .withWindowedWrites() - filenamepolicy

Hi guys,

just playing with reading data from PubSub and writing using TextIO.

First thing is that it is very hard to get any output - a lot of temp files
written but not always would get final files created.

So, I am playing with triggers etc... If I do following

PCollection<String> streamData = p.apply(
        PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME +
"/topics/myTopic"));


streamData.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))

.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3))))
            .withAllowedLateness(Duration.ZERO)
            .discardingFiredPanes())
    .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
    .withSuffix(".suff").withNumShards(10));

    p.run();

I would expect to see some files in /tmp/ with final results.. unless I add
good triggers I usually do not get any data.. only temp files in
/temp/.beam/

but sometimes when data should be written I get following exception

Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.UnsupportedOperationException: There is no default policy for
windowed file output. Please provide an explicit FilenamePolicy to generate
filenames.
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
at Test.main(Test.java:50)


Would it make sense to change TextIO so that it does not
use DefaultFilenamePolicy only - but in case there are windowedWrites and
no filename policy was specified by user it could actually use custom
FilePerWindow policy automatically. I believe today TextIO always expects
user to specify FilenamePolicy, right?

Or maybe to have FilePerWindow policy exposed as part of Beam - I believe
today there are only implementations in tests and examples but nothing
publicly visible, right?



thanks

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Dan Halperin <dh...@google.com.INVALID>.
(we should probably throw an exception at construction time in the various
FileBasedSinks if you use WindowedWrites and the default filename policy
though, that's a no-brainer and it's backwards-compatible.)

On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com> wrote:

> +Eugene, Reuven who reviewed and implemented this code. They may have
> opinions.
>
> Note that changing the default filename policy would be
> backwards-incompatible, so this would either need to go into 2.0.0 (and a
> new RC3) or it would not go in.
>
> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> borisha.zivkovic@gmail.com> wrote:
>
>> great JB, thanks
>>
>> I do not mind working on this - let's see if anyone else has additional
>> input.
>>
>> cheers
>>
>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>> > Got it.
>> >
>> > Yes, agree, I think the PerWindowFilesPolicy could be the default and
>> let
>> > the
>> > user provides its own policy if he wants to.
>> >
>> > Regards
>> > JB
>> >
>> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
>> > > Hi  JB,
>> > >
>> > > yes I saw that thread - I also copied your code but did not want to
>> > pollute
>> > > it with my proposal :)
>> > >
>> > > Well ok maybe default FilePerWindow policy for windowedWrites in
>> TextIO
>> > > does not make sense - not sure TBH...
>> > >
>> > > But would it make sense to promote a version of PerWindowFiles from
>> > >
>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>> > > so that it is easier to provide some kind of PerWindowFiles filename
>> > > policy..
>> > >
>> > >
>> > > something like (where user does not have to write
>> PerWindowFilesPolicy,
>> > it
>> > > comes with Beam)
>> > >
>> > >
>> > >
>> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
>> > > .withWindowedWrites()
>> > > .withNumShards(1));
>> > >
>> > > not sure if this was already discussed...
>> > >
>> > > cheers
>> > > Borisa
>> > >
>> > >
>> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <jb...@nanthrax.net>
>> > wrote:
>> > >
>> > >> Hi Borisa,
>> > >>
>> > >> You can take a look about the other thread ("Direct runner doesn't
>> seem
>> > to
>> > >> finalize checkpoint "quickly"").
>> > >>
>> > >> It's basically the same point ;)
>> > >>
>> > >> The default trigger (event-time) doesn't fire any data. I'm
>> > investigating
>> > >> the
>> > >> element timestamp and watermark.
>> > >>
>> > >> I'm also playing with that, for instance:
>> > >>
>> > >>
>> > >>
>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>> > >>
>> > >> When you use WindowedWrite, you have to provide a filename policy. We
>> > could
>> > >> provide a default one, but not sure it will fit fine (as it depends a
>> > lot
>> > >> about
>> > >> the use cases).
>> > >>
>> > >> Regards
>> > >> JB
>> > >>
>> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
>> > >>> Hi guys,
>> > >>>
>> > >>> just playing with reading data from PubSub and writing using TextIO.
>> > >>>
>> > >>> First thing is that it is very hard to get any output - a lot of
>> temp
>> > >> files
>> > >>> written but not always would get final files created.
>> > >>>
>> > >>> So, I am playing with triggers etc... If I do following
>> > >>>
>> > >>> PCollection<String> streamData = p.apply(
>> > >>>         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME
>> +
>> > >>> "/topics/myTopic"));
>> > >>>
>> > >>>
>> > >>>
>> > >>
>> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio
>> n.standardSeconds(5)))
>> > >>>
>> > >>>
>> > >>
>> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
>> > >>>             .withAllowedLateness(Duration.ZERO)
>> > >>>             .discardingFiredPanes())
>> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
>> > >>>     .withSuffix(".suff").withNumShards(10));
>> > >>>
>> > >>>     p.run();
>> > >>>
>> > >>> I would expect to see some files in /tmp/ with final results..
>> unless I
>> > >> add
>> > >>> good triggers I usually do not get any data.. only temp files in
>> > >>> /temp/.beam/
>> > >>>
>> > >>> but sometimes when data should be written I get following exception
>> > >>>
>> > >>> Exception in thread "main"
>> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> > >>> java.lang.UnsupportedOperationException: There is no default
>> policy for
>> > >>> windowed file output. Please provide an explicit FilenamePolicy to
>> > >> generate
>> > >>> filenames.
>> > >>> at
>> > >>>
>> > >>
>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>> sult.waitUntilFinish(DirectRunner.java:322)
>> > >>> at
>> > >>>
>> > >>
>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>> sult.waitUntilFinish(DirectRunner.java:292)
>> > >>> at
>> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
>> > >>> at
>> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
>> > >>> at Test.main(Test.java:50)
>> > >>>
>> > >>>
>> > >>> Would it make sense to change TextIO so that it does not
>> > >>> use DefaultFilenamePolicy only - but in case there are
>> windowedWrites
>> > and
>> > >>> no filename policy was specified by user it could actually use
>> custom
>> > >>> FilePerWindow policy automatically. I believe today TextIO always
>> > expects
>> > >>> user to specify FilenamePolicy, right?
>> > >>>
>> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I
>> > believe
>> > >>> today there are only implementations in tests and examples but
>> nothing
>> > >>> publicly visible, right?
>> > >>>
>> > >>>
>> > >>>
>> > >>> thanks
>> > >>>
>> > >>
>> > >> --
>> > >> Jean-Baptiste Onofré
>> > >> jbonofre@apache.org
>> > >> http://blog.nanthrax.net
>> > >> Talend - http://www.talend.com
>> > >>
>> > >
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > jbonofre@apache.org
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>> >
>>
>
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Borisa Zivkovic <bo...@gmail.com>.
Great... created this

https://issues.apache.org/jira/browse/BEAM-2276



On Fri, 12 May 2017 at 09:38 Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:

> +1
>
> Borisa, if you want, we can work together on this.
>
> Thanks !
> Regards
> JB
>
> On 05/12/2017 10:33 AM, Borisa Zivkovic wrote:
> > +1 for DefaultFilenamePolicy being able to understand basic windowing...
> > probably the most
> > user-friendly way that would cover most of needs... in case of special
> > needs  users can provide their own policy..
> >
> > another alternative would be to have new class called
> > DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...
> >
> > any of those would make it easier for Beam users..
> >
> > so, someone needs to decide how we want to do this and if you want I can
> > work on it...
> >
> > cheers
> >
> > On Fri, 12 May 2017 at 08:18 Reuven Lax <re...@google.com.invalid>
> wrote:
> >
> >> I believe that for most windows there is a standard stringification.
> >> However I think we could allow the user to inject a window formatter for
> >> cases where there is no good default (e.g. where the window is a
> >> complicated user-defined type, and toString() isn't good enough.
> >>
> >> Alternatively, if we don't want allow formatters,, we could make
> >> DefaultFilenamePolicy work with default stringifications of well-know
> >> windows (fixed, sliding, sessions, etc.), and just use toString() for
> >> remaining window types. Users that have weird custom window types can
> >> always right their own FilenamePolicy.
> >>
> >> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
> >> robertwb@google.com.invalid> wrote:
> >>
> >>> I like the idea of WWW and PPP, assuming there is a standard enough
> >>> stringification of windows and panes. However, we may want to elide
> >>> adjacent tokes if the window is global or the pane is the only
> >>> possible (or first?) one to avoid writing things like
> >>> -0000-of-0005---.
> >>>
> >>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <re...@google.com.invalid>
> >>> wrote:
> >>>> Another idea - we can extend the existing pattern that
> >>>> DefaultFileNamePolicy understands to include windows.
> >>>>
> >>>> Today it replaces SSS with the shard, and NNN with the number of
> shards
> >>> (so
> >>>> many templates contain -SSS-of-NNN). We could also have it recognize
> >> WWW
> >>>> and PPP, for the window and the pane respectively.
> >>>>
> >>>> I believe this would be a backwards-compatible change. We do not need
> >> to
> >>>> change any existing interfaces, we would simply be allowing the
> default
> >>>> policy to work on windows.
> >>>>
> >>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com>
> >>> wrote:
> >>>>
> >>>>> +Eugene, Reuven who reviewed and implemented this code. They may have
> >>>>> opinions.
> >>>>>
> >>>>> Note that changing the default filename policy would be
> >>>>> backwards-incompatible, so this would either need to go into 2.0.0
> >> (and
> >>> a
> >>>>> new RC3) or it would not go in.
> >>>>>
> >>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> >>>>> borisha.zivkovic@gmail.com> wrote:
> >>>>>
> >>>>>> great JB, thanks
> >>>>>>
> >>>>>> I do not mind working on this - let's see if anyone else has
> >> additional
> >>>>>> input.
> >>>>>>
> >>>>>> cheers
> >>>>>>
> >>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Got it.
> >>>>>>>
> >>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default
> >> and
> >>>>>> let
> >>>>>>> the
> >>>>>>> user provides its own policy if he wants to.
> >>>>>>>
> >>>>>>> Regards
> >>>>>>> JB
> >>>>>>>
> >>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> >>>>>>>> Hi  JB,
> >>>>>>>>
> >>>>>>>> yes I saw that thread - I also copied your code but did not want
> >> to
> >>>>>>> pollute
> >>>>>>>> it with my proposal :)
> >>>>>>>>
> >>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in
> >>>>>> TextIO
> >>>>>>>> does not make sense - not sure TBH...
> >>>>>>>>
> >>>>>>>> But would it make sense to promote a version of PerWindowFiles
> >> from
> >>>>>>>>
> >>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>>>>>>> so that it is easier to provide some kind of PerWindowFiles
> >>> filename
> >>>>>>>> policy..
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> something like (where user does not have to write
> >>>>>> PerWindowFilesPolicy,
> >>>>>>> it
> >>>>>>>> comes with Beam)
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> >>>>>>>> .withWindowedWrites()
> >>>>>>>> .withNumShards(1));
> >>>>>>>>
> >>>>>>>> not sure if this was already discussed...
> >>>>>>>>
> >>>>>>>> cheers
> >>>>>>>> Borisa
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
> >> jb@nanthrax.net
> >>>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Borisa,
> >>>>>>>>>
> >>>>>>>>> You can take a look about the other thread ("Direct runner
> >> doesn't
> >>>>>> seem
> >>>>>>> to
> >>>>>>>>> finalize checkpoint "quickly"").
> >>>>>>>>>
> >>>>>>>>> It's basically the same point ;)
> >>>>>>>>>
> >>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm
> >>>>>>> investigating
> >>>>>>>>> the
> >>>>>>>>> element timestamp and watermark.
> >>>>>>>>>
> >>>>>>>>> I'm also playing with that, for instance:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>>>>>>>>
> >>>>>>>>> When you use WindowedWrite, you have to provide a filename
> >>> policy. We
> >>>>>>> could
> >>>>>>>>> provide a default one, but not sure it will fit fine (as it
> >>> depends a
> >>>>>>> lot
> >>>>>>>>> about
> >>>>>>>>> the use cases).
> >>>>>>>>>
> >>>>>>>>> Regards
> >>>>>>>>> JB
> >>>>>>>>>
> >>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> >>>>>>>>>> Hi guys,
> >>>>>>>>>>
> >>>>>>>>>> just playing with reading data from PubSub and writing using
> >>> TextIO.
> >>>>>>>>>>
> >>>>>>>>>> First thing is that it is very hard to get any output - a lot
> >> of
> >>>>>> temp
> >>>>>>>>> files
> >>>>>>>>>> written but not always would get final files created.
> >>>>>>>>>>
> >>>>>>>>>> So, I am playing with triggers etc... If I do following
> >>>>>>>>>>
> >>>>>>>>>> PCollection<String> streamData = p.apply(
> >>>>>>>>>>         PubsubIO.readStrings().fromTopic("projects/"+
> >>> PROJECT_NAME
> >>>>>> +
> >>>>>>>>>> "/topics/myTopic"));
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio
> >>>>>> n.standardSeconds(5)))
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
> >>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> >>>>>>>>>>             .withAllowedLateness(Duration.ZERO)
> >>>>>>>>>>             .discardingFiredPanes())
> >>>>>>>>>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> >>>>>>>>>>     .withSuffix(".suff").withNumShards(10));
> >>>>>>>>>>
> >>>>>>>>>>     p.run();
> >>>>>>>>>>
> >>>>>>>>>> I would expect to see some files in /tmp/ with final results..
> >>>>>> unless I
> >>>>>>>>> add
> >>>>>>>>>> good triggers I usually do not get any data.. only temp files
> >> in
> >>>>>>>>>> /temp/.beam/
> >>>>>>>>>>
> >>>>>>>>>> but sometimes when data should be written I get following
> >>> exception
> >>>>>>>>>>
> >>>>>>>>>> Exception in thread "main"
> >>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >>>>>>>>>> java.lang.UnsupportedOperationException: There is no default
> >>>>>> policy for
> >>>>>>>>>> windowed file output. Please provide an explicit FilenamePolicy
> >>> to
> >>>>>>>>> generate
> >>>>>>>>>> filenames.
> >>>>>>>>>> at
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>>>>> sult.waitUntilFinish(DirectRunner.java:322)
> >>>>>>>>>> at
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>>>>> sult.waitUntilFinish(DirectRunner.java:292)
> >>>>>>>>>> at
> >>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
> >>> DirectRunner.java:200)
> >>>>>>>>>> at
> >>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
> >>> DirectRunner.java:63)
> >>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> >>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> >>>>>>>>>> at Test.main(Test.java:50)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Would it make sense to change TextIO so that it does not
> >>>>>>>>>> use DefaultFilenamePolicy only - but in case there are
> >>>>>> windowedWrites
> >>>>>>> and
> >>>>>>>>>> no filename policy was specified by user it could actually use
> >>>>>> custom
> >>>>>>>>>> FilePerWindow policy automatically. I believe today TextIO
> >> always
> >>>>>>> expects
> >>>>>>>>>> user to specify FilenamePolicy, right?
> >>>>>>>>>>
> >>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam
> >> - I
> >>>>>>> believe
> >>>>>>>>>> today there are only implementations in tests and examples but
> >>>>>> nothing
> >>>>>>>>>> publicly visible, right?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> thanks
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Jean-Baptiste Onofré
> >>>>>>>>> jbonofre@apache.org
> >>>>>>>>> http://blog.nanthrax.net
> >>>>>>>>> Talend - http://www.talend.com
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Jean-Baptiste Onofré
> >>>>>>> jbonofre@apache.org
> >>>>>>> http://blog.nanthrax.net
> >>>>>>> Talend - http://www.talend.com
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
+1

Borisa, if you want, we can work together on this.

Thanks !
Regards
JB

On 05/12/2017 10:33 AM, Borisa Zivkovic wrote:
> +1 for DefaultFilenamePolicy being able to understand basic windowing...
> probably the most
> user-friendly way that would cover most of needs... in case of special
> needs  users can provide their own policy..
>
> another alternative would be to have new class called
> DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...
>
> any of those would make it easier for Beam users..
>
> so, someone needs to decide how we want to do this and if you want I can
> work on it...
>
> cheers
>
> On Fri, 12 May 2017 at 08:18 Reuven Lax <re...@google.com.invalid> wrote:
>
>> I believe that for most windows there is a standard stringification.
>> However I think we could allow the user to inject a window formatter for
>> cases where there is no good default (e.g. where the window is a
>> complicated user-defined type, and toString() isn't good enough.
>>
>> Alternatively, if we don't want allow formatters,, we could make
>> DefaultFilenamePolicy work with default stringifications of well-know
>> windows (fixed, sliding, sessions, etc.), and just use toString() for
>> remaining window types. Users that have weird custom window types can
>> always right their own FilenamePolicy.
>>
>> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
>> robertwb@google.com.invalid> wrote:
>>
>>> I like the idea of WWW and PPP, assuming there is a standard enough
>>> stringification of windows and panes. However, we may want to elide
>>> adjacent tokes if the window is global or the pane is the only
>>> possible (or first?) one to avoid writing things like
>>> -0000-of-0005---.
>>>
>>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <re...@google.com.invalid>
>>> wrote:
>>>> Another idea - we can extend the existing pattern that
>>>> DefaultFileNamePolicy understands to include windows.
>>>>
>>>> Today it replaces SSS with the shard, and NNN with the number of shards
>>> (so
>>>> many templates contain -SSS-of-NNN). We could also have it recognize
>> WWW
>>>> and PPP, for the window and the pane respectively.
>>>>
>>>> I believe this would be a backwards-compatible change. We do not need
>> to
>>>> change any existing interfaces, we would simply be allowing the default
>>>> policy to work on windows.
>>>>
>>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com>
>>> wrote:
>>>>
>>>>> +Eugene, Reuven who reviewed and implemented this code. They may have
>>>>> opinions.
>>>>>
>>>>> Note that changing the default filename policy would be
>>>>> backwards-incompatible, so this would either need to go into 2.0.0
>> (and
>>> a
>>>>> new RC3) or it would not go in.
>>>>>
>>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
>>>>> borisha.zivkovic@gmail.com> wrote:
>>>>>
>>>>>> great JB, thanks
>>>>>>
>>>>>> I do not mind working on this - let's see if anyone else has
>> additional
>>>>>> input.
>>>>>>
>>>>>> cheers
>>>>>>
>>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
>>>>>> wrote:
>>>>>>
>>>>>>> Got it.
>>>>>>>
>>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default
>> and
>>>>>> let
>>>>>>> the
>>>>>>> user provides its own policy if he wants to.
>>>>>>>
>>>>>>> Regards
>>>>>>> JB
>>>>>>>
>>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
>>>>>>>> Hi  JB,
>>>>>>>>
>>>>>>>> yes I saw that thread - I also copied your code but did not want
>> to
>>>>>>> pollute
>>>>>>>> it with my proposal :)
>>>>>>>>
>>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in
>>>>>> TextIO
>>>>>>>> does not make sense - not sure TBH...
>>>>>>>>
>>>>>>>> But would it make sense to promote a version of PerWindowFiles
>> from
>>>>>>>>
>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>>>>>>> so that it is easier to provide some kind of PerWindowFiles
>>> filename
>>>>>>>> policy..
>>>>>>>>
>>>>>>>>
>>>>>>>> something like (where user does not have to write
>>>>>> PerWindowFilesPolicy,
>>>>>>> it
>>>>>>>> comes with Beam)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
>>>>>>>> .withWindowedWrites()
>>>>>>>> .withNumShards(1));
>>>>>>>>
>>>>>>>> not sure if this was already discussed...
>>>>>>>>
>>>>>>>> cheers
>>>>>>>> Borisa
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
>> jb@nanthrax.net
>>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Borisa,
>>>>>>>>>
>>>>>>>>> You can take a look about the other thread ("Direct runner
>> doesn't
>>>>>> seem
>>>>>>> to
>>>>>>>>> finalize checkpoint "quickly"").
>>>>>>>>>
>>>>>>>>> It's basically the same point ;)
>>>>>>>>>
>>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm
>>>>>>> investigating
>>>>>>>>> the
>>>>>>>>> element timestamp and watermark.
>>>>>>>>>
>>>>>>>>> I'm also playing with that, for instance:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>>>>>>>>
>>>>>>>>> When you use WindowedWrite, you have to provide a filename
>>> policy. We
>>>>>>> could
>>>>>>>>> provide a default one, but not sure it will fit fine (as it
>>> depends a
>>>>>>> lot
>>>>>>>>> about
>>>>>>>>> the use cases).
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>> JB
>>>>>>>>>
>>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
>>>>>>>>>> Hi guys,
>>>>>>>>>>
>>>>>>>>>> just playing with reading data from PubSub and writing using
>>> TextIO.
>>>>>>>>>>
>>>>>>>>>> First thing is that it is very hard to get any output - a lot
>> of
>>>>>> temp
>>>>>>>>> files
>>>>>>>>>> written but not always would get final files created.
>>>>>>>>>>
>>>>>>>>>> So, I am playing with triggers etc... If I do following
>>>>>>>>>>
>>>>>>>>>> PCollection<String> streamData = p.apply(
>>>>>>>>>>         PubsubIO.readStrings().fromTopic("projects/"+
>>> PROJECT_NAME
>>>>>> +
>>>>>>>>>> "/topics/myTopic"));
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio
>>>>>> n.standardSeconds(5)))
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
>>>>>>>>>>             .withAllowedLateness(Duration.ZERO)
>>>>>>>>>>             .discardingFiredPanes())
>>>>>>>>>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
>>>>>>>>>>     .withSuffix(".suff").withNumShards(10));
>>>>>>>>>>
>>>>>>>>>>     p.run();
>>>>>>>>>>
>>>>>>>>>> I would expect to see some files in /tmp/ with final results..
>>>>>> unless I
>>>>>>>>> add
>>>>>>>>>> good triggers I usually do not get any data.. only temp files
>> in
>>>>>>>>>> /temp/.beam/
>>>>>>>>>>
>>>>>>>>>> but sometimes when data should be written I get following
>>> exception
>>>>>>>>>>
>>>>>>>>>> Exception in thread "main"
>>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>>>>>> java.lang.UnsupportedOperationException: There is no default
>>>>>> policy for
>>>>>>>>>> windowed file output. Please provide an explicit FilenamePolicy
>>> to
>>>>>>>>> generate
>>>>>>>>>> filenames.
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>>
>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>>>>> sult.waitUntilFinish(DirectRunner.java:322)
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>>
>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>>>>> sult.waitUntilFinish(DirectRunner.java:292)
>>>>>>>>>> at
>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
>>> DirectRunner.java:200)
>>>>>>>>>> at
>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
>>> DirectRunner.java:63)
>>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
>>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
>>>>>>>>>> at Test.main(Test.java:50)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Would it make sense to change TextIO so that it does not
>>>>>>>>>> use DefaultFilenamePolicy only - but in case there are
>>>>>> windowedWrites
>>>>>>> and
>>>>>>>>>> no filename policy was specified by user it could actually use
>>>>>> custom
>>>>>>>>>> FilePerWindow policy automatically. I believe today TextIO
>> always
>>>>>>> expects
>>>>>>>>>> user to specify FilenamePolicy, right?
>>>>>>>>>>
>>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam
>> - I
>>>>>>> believe
>>>>>>>>>> today there are only implementations in tests and examples but
>>>>>> nothing
>>>>>>>>>> publicly visible, right?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> thanks
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>> jbonofre@apache.org
>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jean-Baptiste Onofré
>>>>>>> jbonofre@apache.org
>>>>>>> http://blog.nanthrax.net
>>>>>>> Talend - http://www.talend.com
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Borisa Zivkovic <bo...@gmail.com>.
yep... I figured that myself..

let me work with JB and others to see if we can come up with satisfactory
default implementation

On Fri, 12 May 2017 at 16:14 Dan Halperin <dh...@google.com.invalid>
wrote:

> DefaultFilenamePolicy as currently written only accepts a single shard name
> template. if that template is windowed, it won't work for unwindowed writes
> (it will have -WWW-PPP or .WWW.PPP or something.WWW.something.PPPP?). And
> vice versa on unwindowed templated for windowed writes.
>
> There is likely a way to fix this more easily in the File IOs themselves.
>
> On Fri, May 12, 2017 at 4:33 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> > Hi Reuven,
> >
> > yes, it's what I have in mind: just provide the default logic in the
> > existing method.
> >
> > Regards
> > JB
> >
> >
> > On 05/12/2017 12:46 PM, Reuven Lax wrote:
> >
> >> DefaultFilenamePolicy already contains a windowedFilename override
> (today
> >> it throws an exception), so I don't think there's any need for a new
> >> class.
> >> We can simply fill out the existing method.
> >>
> >> On May 12, 2017 11:34 AM, "Borisa Zivkovic" <borisha.zivkovic@gmail.com
> >
> >> wrote:
> >>
> >> +1 for DefaultFilenamePolicy being able to understand basic windowing...
> >> probably the most
> >> user-friendly way that would cover most of needs... in case of special
> >> needs  users can provide their own policy..
> >>
> >> another alternative would be to have new class called
> >> DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...
> >>
> >> any of those would make it easier for Beam users..
> >>
> >> so, someone needs to decide how we want to do this and if you want I can
> >> work on it...
> >>
> >> cheers
> >>
> >> On Fri, 12 May 2017 at 08:18 Reuven Lax <re...@google.com.invalid>
> wrote:
> >>
> >> I believe that for most windows there is a standard stringification.
> >>> However I think we could allow the user to inject a window formatter
> for
> >>> cases where there is no good default (e.g. where the window is a
> >>> complicated user-defined type, and toString() isn't good enough.
> >>>
> >>> Alternatively, if we don't want allow formatters,, we could make
> >>> DefaultFilenamePolicy work with default stringifications of well-know
> >>> windows (fixed, sliding, sessions, etc.), and just use toString() for
> >>> remaining window types. Users that have weird custom window types can
> >>> always right their own FilenamePolicy.
> >>>
> >>> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
> >>> robertwb@google.com.invalid> wrote:
> >>>
> >>> I like the idea of WWW and PPP, assuming there is a standard enough
> >>>> stringification of windows and panes. However, we may want to elide
> >>>> adjacent tokes if the window is global or the pane is the only
> >>>> possible (or first?) one to avoid writing things like
> >>>> -0000-of-0005---.
> >>>>
> >>>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <relax@google.com.invalid
> >
> >>>> wrote:
> >>>>
> >>>>> Another idea - we can extend the existing pattern that
> >>>>> DefaultFileNamePolicy understands to include windows.
> >>>>>
> >>>>> Today it replaces SSS with the shard, and NNN with the number of
> >>>>>
> >>>> shards
> >>
> >>> (so
> >>>>
> >>>>> many templates contain -SSS-of-NNN). We could also have it recognize
> >>>>>
> >>>> WWW
> >>>
> >>>> and PPP, for the window and the pane respectively.
> >>>>>
> >>>>> I believe this would be a backwards-compatible change. We do not need
> >>>>>
> >>>> to
> >>>
> >>>> change any existing interfaces, we would simply be allowing the
> >>>>>
> >>>> default
> >>
> >>> policy to work on windows.
> >>>>>
> >>>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com>
> >>>>>
> >>>> wrote:
> >>>>
> >>>>>
> >>>>> +Eugene, Reuven who reviewed and implemented this code. They may have
> >>>>>> opinions.
> >>>>>>
> >>>>>> Note that changing the default filename policy would be
> >>>>>> backwards-incompatible, so this would either need to go into 2.0.0
> >>>>>>
> >>>>> (and
> >>>
> >>>> a
> >>>>
> >>>>> new RC3) or it would not go in.
> >>>>>>
> >>>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> >>>>>> borisha.zivkovic@gmail.com> wrote:
> >>>>>>
> >>>>>> great JB, thanks
> >>>>>>>
> >>>>>>> I do not mind working on this - let's see if anyone else has
> >>>>>>>
> >>>>>> additional
> >>>
> >>>> input.
> >>>>>>>
> >>>>>>> cheers
> >>>>>>>
> >>>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb@nanthrax.net
> >
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>> Got it.
> >>>>>>>>
> >>>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default
> >>>>>>>>
> >>>>>>> and
> >>>
> >>>> let
> >>>>>>>
> >>>>>>>> the
> >>>>>>>> user provides its own policy if he wants to.
> >>>>>>>>
> >>>>>>>> Regards
> >>>>>>>> JB
> >>>>>>>>
> >>>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> >>>>>>>>
> >>>>>>>>> Hi  JB,
> >>>>>>>>>
> >>>>>>>>> yes I saw that thread - I also copied your code but did not want
> >>>>>>>>>
> >>>>>>>> to
> >>>
> >>>> pollute
> >>>>>>>>
> >>>>>>>>> it with my proposal :)
> >>>>>>>>>
> >>>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in
> >>>>>>>>>
> >>>>>>>> TextIO
> >>>>>>>
> >>>>>>>> does not make sense - not sure TBH...
> >>>>>>>>>
> >>>>>>>>> But would it make sense to promote a version of PerWindowFiles
> >>>>>>>>>
> >>>>>>>> from
> >>>
> >>>>
> >>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>>>>>>>
> >>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>>>>>>
> >>>>>>>> so that it is easier to provide some kind of PerWindowFiles
> >>>>>>>>>
> >>>>>>>> filename
> >>>>
> >>>>> policy..
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> something like (where user does not have to write
> >>>>>>>>>
> >>>>>>>> PerWindowFilesPolicy,
> >>>>>>>
> >>>>>>>> it
> >>>>>>>>
> >>>>>>>>> comes with Beam)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> >>>>>>>>> .withWindowedWrites()
> >>>>>>>>> .withNumShards(1));
> >>>>>>>>>
> >>>>>>>>> not sure if this was already discussed...
> >>>>>>>>>
> >>>>>>>>> cheers
> >>>>>>>>> Borisa
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
> >>>>>>>>>
> >>>>>>>> jb@nanthrax.net
> >>>
> >>>>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Hi Borisa,
> >>>>>>>>>>
> >>>>>>>>>> You can take a look about the other thread ("Direct runner
> >>>>>>>>>>
> >>>>>>>>> doesn't
> >>>
> >>>> seem
> >>>>>>>
> >>>>>>>> to
> >>>>>>>>
> >>>>>>>>> finalize checkpoint "quickly"").
> >>>>>>>>>>
> >>>>>>>>>> It's basically the same point ;)
> >>>>>>>>>>
> >>>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm
> >>>>>>>>>>
> >>>>>>>>> investigating
> >>>>>>>>
> >>>>>>>>> the
> >>>>>>>>>> element timestamp and watermark.
> >>>>>>>>>>
> >>>>>>>>>> I'm also playing with that, for instance:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>>>>>>>
> >>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>>>>>>
> >>>>>>>>
> >>>>>>>>>> When you use WindowedWrite, you have to provide a filename
> >>>>>>>>>>
> >>>>>>>>> policy. We
> >>>>
> >>>>> could
> >>>>>>>>
> >>>>>>>>> provide a default one, but not sure it will fit fine (as it
> >>>>>>>>>>
> >>>>>>>>> depends a
> >>>>
> >>>>> lot
> >>>>>>>>
> >>>>>>>>> about
> >>>>>>>>>> the use cases).
> >>>>>>>>>>
> >>>>>>>>>> Regards
> >>>>>>>>>> JB
> >>>>>>>>>>
> >>>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi guys,
> >>>>>>>>>>>
> >>>>>>>>>>> just playing with reading data from PubSub and writing using
> >>>>>>>>>>>
> >>>>>>>>>> TextIO.
> >>>>
> >>>>>
> >>>>>>>>>>> First thing is that it is very hard to get any output - a lot
> >>>>>>>>>>>
> >>>>>>>>>> of
> >>>
> >>>> temp
> >>>>>>>
> >>>>>>>> files
> >>>>>>>>>>
> >>>>>>>>>>> written but not always would get final files created.
> >>>>>>>>>>>
> >>>>>>>>>>> So, I am playing with triggers etc... If I do following
> >>>>>>>>>>>
> >>>>>>>>>>> PCollection<String> streamData = p.apply(
> >>>>>>>>>>>         PubsubIO.readStrings().fromTopic("projects/"+
> >>>>>>>>>>>
> >>>>>>>>>> PROJECT_NAME
> >>>>
> >>>>> +
> >>>>>>>
> >>>>>>>> "/topics/myTopic"));
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio
> >>>>>>>>
> >>>>>>> n.standardSeconds(5)))
> >>>>>>>
> >>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
> >>>>>>>>
> >>>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> >>>>>>>
> >>>>>>>>             .withAllowedLateness(Duration.ZERO)
> >>>>>>>>>>>             .discardingFiredPanes())
> >>>>>>>>>>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> >>>>>>>>>>>     .withSuffix(".suff").withNumShards(10));
> >>>>>>>>>>>
> >>>>>>>>>>>     p.run();
> >>>>>>>>>>>
> >>>>>>>>>>> I would expect to see some files in /tmp/ with final results..
> >>>>>>>>>>>
> >>>>>>>>>> unless I
> >>>>>>>
> >>>>>>>> add
> >>>>>>>>>>
> >>>>>>>>>>> good triggers I usually do not get any data.. only temp files
> >>>>>>>>>>>
> >>>>>>>>>> in
> >>>
> >>>> /temp/.beam/
> >>>>>>>>>>>
> >>>>>>>>>>> but sometimes when data should be written I get following
> >>>>>>>>>>>
> >>>>>>>>>> exception
> >>>>
> >>>>>
> >>>>>>>>>>> Exception in thread "main"
> >>>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >>>>>>>>>>> java.lang.UnsupportedOperationException: There is no default
> >>>>>>>>>>>
> >>>>>>>>>> policy for
> >>>>>>>
> >>>>>>>> windowed file output. Please provide an explicit
> >>>>>>>>>>>
> >>>>>>>>>> FilenamePolicy
> >>
> >>> to
> >>>>
> >>>>> generate
> >>>>>>>>>>
> >>>>>>>>>>> filenames.
> >>>>>>>>>>> at
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>>>>>>>
> >>>>>>> sult.waitUntilFinish(DirectRunner.java:322)
> >>>>>>>
> >>>>>>>> at
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>>>>>>>
> >>>>>>> sult.waitUntilFinish(DirectRunner.java:292)
> >>>>>>>
> >>>>>>>> at
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
> >>>>>>>>
> >>>>>>> DirectRunner.java:200)
> >>>>
> >>>>> at
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
> >>>>>>>>
> >>>>>>> DirectRunner.java:63)
> >>>>
> >>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> >>>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> >>>>>>>>>>> at Test.main(Test.java:50)
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Would it make sense to change TextIO so that it does not
> >>>>>>>>>>> use DefaultFilenamePolicy only - but in case there are
> >>>>>>>>>>>
> >>>>>>>>>> windowedWrites
> >>>>>>>
> >>>>>>>> and
> >>>>>>>>
> >>>>>>>>> no filename policy was specified by user it could actually use
> >>>>>>>>>>>
> >>>>>>>>>> custom
> >>>>>>>
> >>>>>>>> FilePerWindow policy automatically. I believe today TextIO
> >>>>>>>>>>>
> >>>>>>>>>> always
> >>>
> >>>> expects
> >>>>>>>>
> >>>>>>>>> user to specify FilenamePolicy, right?
> >>>>>>>>>>>
> >>>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam
> >>>>>>>>>>>
> >>>>>>>>>> - I
> >>>
> >>>> believe
> >>>>>>>>
> >>>>>>>>> today there are only implementations in tests and examples but
> >>>>>>>>>>>
> >>>>>>>>>> nothing
> >>>>>>>
> >>>>>>>> publicly visible, right?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> thanks
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> Jean-Baptiste Onofré
> >>>>>>>>>> jbonofre@apache.org
> >>>>>>>>>> http://blog.nanthrax.net
> >>>>>>>>>> Talend - http://www.talend.com
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>> --
> >>>>>>>> Jean-Baptiste Onofré
> >>>>>>>> jbonofre@apache.org
> >>>>>>>> http://blog.nanthrax.net
> >>>>>>>> Talend - http://www.talend.com
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Borisa Zivkovic <bo...@gmail.com>.
Created PR https://github.com/apache/beam/pull/3142

On Fri, 12 May 2017 at 16:31 Reuven Lax <re...@google.com.invalid> wrote:

> Can we simply fail WWW if windowed writes is not set? Or at least warn?
>
> On May 12, 2017 6:14 PM, "Dan Halperin" <dh...@google.com.invalid>
> wrote:
>
> DefaultFilenamePolicy as currently written only accepts a single shard name
> template. if that template is windowed, it won't work for unwindowed writes
> (it will have -WWW-PPP or .WWW.PPP or something.WWW.something.PPPP?). And
> vice versa on unwindowed templated for windowed writes.
>
> There is likely a way to fix this more easily in the File IOs themselves.
>
> On Fri, May 12, 2017 at 4:33 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> > Hi Reuven,
> >
> > yes, it's what I have in mind: just provide the default logic in the
> > existing method.
> >
> > Regards
> > JB
> >
> >
> > On 05/12/2017 12:46 PM, Reuven Lax wrote:
> >
> >> DefaultFilenamePolicy already contains a windowedFilename override
> (today
> >> it throws an exception), so I don't think there's any need for a new
> >> class.
> >> We can simply fill out the existing method.
> >>
> >> On May 12, 2017 11:34 AM, "Borisa Zivkovic" <borisha.zivkovic@gmail.com
> >
> >> wrote:
> >>
> >> +1 for DefaultFilenamePolicy being able to understand basic windowing...
> >> probably the most
> >> user-friendly way that would cover most of needs... in case of special
> >> needs  users can provide their own policy..
> >>
> >> another alternative would be to have new class called
> >> DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...
> >>
> >> any of those would make it easier for Beam users..
> >>
> >> so, someone needs to decide how we want to do this and if you want I can
> >> work on it...
> >>
> >> cheers
> >>
> >> On Fri, 12 May 2017 at 08:18 Reuven Lax <re...@google.com.invalid>
> wrote:
> >>
> >> I believe that for most windows there is a standard stringification.
> >>> However I think we could allow the user to inject a window formatter
> for
> >>> cases where there is no good default (e.g. where the window is a
> >>> complicated user-defined type, and toString() isn't good enough.
> >>>
> >>> Alternatively, if we don't want allow formatters,, we could make
> >>> DefaultFilenamePolicy work with default stringifications of well-know
> >>> windows (fixed, sliding, sessions, etc.), and just use toString() for
> >>> remaining window types. Users that have weird custom window types can
> >>> always right their own FilenamePolicy.
> >>>
> >>> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
> >>> robertwb@google.com.invalid> wrote:
> >>>
> >>> I like the idea of WWW and PPP, assuming there is a standard enough
> >>>> stringification of windows and panes. However, we may want to elide
> >>>> adjacent tokes if the window is global or the pane is the only
> >>>> possible (or first?) one to avoid writing things like
> >>>> -0000-of-0005---.
> >>>>
> >>>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <relax@google.com.invalid
> >
> >>>> wrote:
> >>>>
> >>>>> Another idea - we can extend the existing pattern that
> >>>>> DefaultFileNamePolicy understands to include windows.
> >>>>>
> >>>>> Today it replaces SSS with the shard, and NNN with the number of
> >>>>>
> >>>> shards
> >>
> >>> (so
> >>>>
> >>>>> many templates contain -SSS-of-NNN). We could also have it recognize
> >>>>>
> >>>> WWW
> >>>
> >>>> and PPP, for the window and the pane respectively.
> >>>>>
> >>>>> I believe this would be a backwards-compatible change. We do not need
> >>>>>
> >>>> to
> >>>
> >>>> change any existing interfaces, we would simply be allowing the
> >>>>>
> >>>> default
> >>
> >>> policy to work on windows.
> >>>>>
> >>>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com>
> >>>>>
> >>>> wrote:
> >>>>
> >>>>>
> >>>>> +Eugene, Reuven who reviewed and implemented this code. They may have
> >>>>>> opinions.
> >>>>>>
> >>>>>> Note that changing the default filename policy would be
> >>>>>> backwards-incompatible, so this would either need to go into 2.0.0
> >>>>>>
> >>>>> (and
> >>>
> >>>> a
> >>>>
> >>>>> new RC3) or it would not go in.
> >>>>>>
> >>>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> >>>>>> borisha.zivkovic@gmail.com> wrote:
> >>>>>>
> >>>>>> great JB, thanks
> >>>>>>>
> >>>>>>> I do not mind working on this - let's see if anyone else has
> >>>>>>>
> >>>>>> additional
> >>>
> >>>> input.
> >>>>>>>
> >>>>>>> cheers
> >>>>>>>
> >>>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb@nanthrax.net
> >
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>> Got it.
> >>>>>>>>
> >>>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default
> >>>>>>>>
> >>>>>>> and
> >>>
> >>>> let
> >>>>>>>
> >>>>>>>> the
> >>>>>>>> user provides its own policy if he wants to.
> >>>>>>>>
> >>>>>>>> Regards
> >>>>>>>> JB
> >>>>>>>>
> >>>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> >>>>>>>>
> >>>>>>>>> Hi  JB,
> >>>>>>>>>
> >>>>>>>>> yes I saw that thread - I also copied your code but did not want
> >>>>>>>>>
> >>>>>>>> to
> >>>
> >>>> pollute
> >>>>>>>>
> >>>>>>>>> it with my proposal :)
> >>>>>>>>>
> >>>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in
> >>>>>>>>>
> >>>>>>>> TextIO
> >>>>>>>
> >>>>>>>> does not make sense - not sure TBH...
> >>>>>>>>>
> >>>>>>>>> But would it make sense to promote a version of PerWindowFiles
> >>>>>>>>>
> >>>>>>>> from
> >>>
> >>>>
> >>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>>>>>>>
> >>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>>>>>>
> >>>>>>>> so that it is easier to provide some kind of PerWindowFiles
> >>>>>>>>>
> >>>>>>>> filename
> >>>>
> >>>>> policy..
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> something like (where user does not have to write
> >>>>>>>>>
> >>>>>>>> PerWindowFilesPolicy,
> >>>>>>>
> >>>>>>>> it
> >>>>>>>>
> >>>>>>>>> comes with Beam)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> >>>>>>>>> .withWindowedWrites()
> >>>>>>>>> .withNumShards(1));
> >>>>>>>>>
> >>>>>>>>> not sure if this was already discussed...
> >>>>>>>>>
> >>>>>>>>> cheers
> >>>>>>>>> Borisa
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
> >>>>>>>>>
> >>>>>>>> jb@nanthrax.net
> >>>
> >>>>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Hi Borisa,
> >>>>>>>>>>
> >>>>>>>>>> You can take a look about the other thread ("Direct runner
> >>>>>>>>>>
> >>>>>>>>> doesn't
> >>>
> >>>> seem
> >>>>>>>
> >>>>>>>> to
> >>>>>>>>
> >>>>>>>>> finalize checkpoint "quickly"").
> >>>>>>>>>>
> >>>>>>>>>> It's basically the same point ;)
> >>>>>>>>>>
> >>>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm
> >>>>>>>>>>
> >>>>>>>>> investigating
> >>>>>>>>
> >>>>>>>>> the
> >>>>>>>>>> element timestamp and watermark.
> >>>>>>>>>>
> >>>>>>>>>> I'm also playing with that, for instance:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>>>>>>>
> >>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>>>>>>
> >>>>>>>>
> >>>>>>>>>> When you use WindowedWrite, you have to provide a filename
> >>>>>>>>>>
> >>>>>>>>> policy. We
> >>>>
> >>>>> could
> >>>>>>>>
> >>>>>>>>> provide a default one, but not sure it will fit fine (as it
> >>>>>>>>>>
> >>>>>>>>> depends a
> >>>>
> >>>>> lot
> >>>>>>>>
> >>>>>>>>> about
> >>>>>>>>>> the use cases).
> >>>>>>>>>>
> >>>>>>>>>> Regards
> >>>>>>>>>> JB
> >>>>>>>>>>
> >>>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi guys,
> >>>>>>>>>>>
> >>>>>>>>>>> just playing with reading data from PubSub and writing using
> >>>>>>>>>>>
> >>>>>>>>>> TextIO.
> >>>>
> >>>>>
> >>>>>>>>>>> First thing is that it is very hard to get any output - a lot
> >>>>>>>>>>>
> >>>>>>>>>> of
> >>>
> >>>> temp
> >>>>>>>
> >>>>>>>> files
> >>>>>>>>>>
> >>>>>>>>>>> written but not always would get final files created.
> >>>>>>>>>>>
> >>>>>>>>>>> So, I am playing with triggers etc... If I do following
> >>>>>>>>>>>
> >>>>>>>>>>> PCollection<String> streamData = p.apply(
> >>>>>>>>>>>         PubsubIO.readStrings().fromTopic("projects/"+
> >>>>>>>>>>>
> >>>>>>>>>> PROJECT_NAME
> >>>>
> >>>>> +
> >>>>>>>
> >>>>>>>> "/topics/myTopic"));
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio
> >>>>>>>>
> >>>>>>> n.standardSeconds(5)))
> >>>>>>>
> >>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
> >>>>>>>>
> >>>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> >>>>>>>
> >>>>>>>>             .withAllowedLateness(Duration.ZERO)
> >>>>>>>>>>>             .discardingFiredPanes())
> >>>>>>>>>>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> >>>>>>>>>>>     .withSuffix(".suff").withNumShards(10));
> >>>>>>>>>>>
> >>>>>>>>>>>     p.run();
> >>>>>>>>>>>
> >>>>>>>>>>> I would expect to see some files in /tmp/ with final results..
> >>>>>>>>>>>
> >>>>>>>>>> unless I
> >>>>>>>
> >>>>>>>> add
> >>>>>>>>>>
> >>>>>>>>>>> good triggers I usually do not get any data.. only temp files
> >>>>>>>>>>>
> >>>>>>>>>> in
> >>>
> >>>> /temp/.beam/
> >>>>>>>>>>>
> >>>>>>>>>>> but sometimes when data should be written I get following
> >>>>>>>>>>>
> >>>>>>>>>> exception
> >>>>
> >>>>>
> >>>>>>>>>>> Exception in thread "main"
> >>>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >>>>>>>>>>> java.lang.UnsupportedOperationException: There is no default
> >>>>>>>>>>>
> >>>>>>>>>> policy for
> >>>>>>>
> >>>>>>>> windowed file output. Please provide an explicit
> >>>>>>>>>>>
> >>>>>>>>>> FilenamePolicy
> >>
> >>> to
> >>>>
> >>>>> generate
> >>>>>>>>>>
> >>>>>>>>>>> filenames.
> >>>>>>>>>>> at
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>>>>>>>
> >>>>>>> sult.waitUntilFinish(DirectRunner.java:322)
> >>>>>>>
> >>>>>>>> at
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>>>>>>>
> >>>>>>> sult.waitUntilFinish(DirectRunner.java:292)
> >>>>>>>
> >>>>>>>> at
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
> >>>>>>>>
> >>>>>>> DirectRunner.java:200)
> >>>>
> >>>>> at
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
> >>>>>>>>
> >>>>>>> DirectRunner.java:63)
> >>>>
> >>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> >>>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> >>>>>>>>>>> at Test.main(Test.java:50)
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Would it make sense to change TextIO so that it does not
> >>>>>>>>>>> use DefaultFilenamePolicy only - but in case there are
> >>>>>>>>>>>
> >>>>>>>>>> windowedWrites
> >>>>>>>
> >>>>>>>> and
> >>>>>>>>
> >>>>>>>>> no filename policy was specified by user it could actually use
> >>>>>>>>>>>
> >>>>>>>>>> custom
> >>>>>>>
> >>>>>>>> FilePerWindow policy automatically. I believe today TextIO
> >>>>>>>>>>>
> >>>>>>>>>> always
> >>>
> >>>> expects
> >>>>>>>>
> >>>>>>>>> user to specify FilenamePolicy, right?
> >>>>>>>>>>>
> >>>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam
> >>>>>>>>>>>
> >>>>>>>>>> - I
> >>>
> >>>> believe
> >>>>>>>>
> >>>>>>>>> today there are only implementations in tests and examples but
> >>>>>>>>>>>
> >>>>>>>>>> nothing
> >>>>>>>
> >>>>>>>> publicly visible, right?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> thanks
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> Jean-Baptiste Onofré
> >>>>>>>>>> jbonofre@apache.org
> >>>>>>>>>> http://blog.nanthrax.net
> >>>>>>>>>> Talend - http://www.talend.com
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>> --
> >>>>>>>> Jean-Baptiste Onofré
> >>>>>>>> jbonofre@apache.org
> >>>>>>>> http://blog.nanthrax.net
> >>>>>>>> Talend - http://www.talend.com
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Reuven Lax <re...@google.com.INVALID>.
Can we simply fail WWW if windowed writes is not set? Or at least warn?

On May 12, 2017 6:14 PM, "Dan Halperin" <dh...@google.com.invalid> wrote:

DefaultFilenamePolicy as currently written only accepts a single shard name
template. if that template is windowed, it won't work for unwindowed writes
(it will have -WWW-PPP or .WWW.PPP or something.WWW.something.PPPP?). And
vice versa on unwindowed templated for windowed writes.

There is likely a way to fix this more easily in the File IOs themselves.

On Fri, May 12, 2017 at 4:33 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Reuven,
>
> yes, it's what I have in mind: just provide the default logic in the
> existing method.
>
> Regards
> JB
>
>
> On 05/12/2017 12:46 PM, Reuven Lax wrote:
>
>> DefaultFilenamePolicy already contains a windowedFilename override (today
>> it throws an exception), so I don't think there's any need for a new
>> class.
>> We can simply fill out the existing method.
>>
>> On May 12, 2017 11:34 AM, "Borisa Zivkovic" <bo...@gmail.com>
>> wrote:
>>
>> +1 for DefaultFilenamePolicy being able to understand basic windowing...
>> probably the most
>> user-friendly way that would cover most of needs... in case of special
>> needs  users can provide their own policy..
>>
>> another alternative would be to have new class called
>> DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...
>>
>> any of those would make it easier for Beam users..
>>
>> so, someone needs to decide how we want to do this and if you want I can
>> work on it...
>>
>> cheers
>>
>> On Fri, 12 May 2017 at 08:18 Reuven Lax <re...@google.com.invalid> wrote:
>>
>> I believe that for most windows there is a standard stringification.
>>> However I think we could allow the user to inject a window formatter for
>>> cases where there is no good default (e.g. where the window is a
>>> complicated user-defined type, and toString() isn't good enough.
>>>
>>> Alternatively, if we don't want allow formatters,, we could make
>>> DefaultFilenamePolicy work with default stringifications of well-know
>>> windows (fixed, sliding, sessions, etc.), and just use toString() for
>>> remaining window types. Users that have weird custom window types can
>>> always right their own FilenamePolicy.
>>>
>>> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
>>> robertwb@google.com.invalid> wrote:
>>>
>>> I like the idea of WWW and PPP, assuming there is a standard enough
>>>> stringification of windows and panes. However, we may want to elide
>>>> adjacent tokes if the window is global or the pane is the only
>>>> possible (or first?) one to avoid writing things like
>>>> -0000-of-0005---.
>>>>
>>>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <re...@google.com.invalid>
>>>> wrote:
>>>>
>>>>> Another idea - we can extend the existing pattern that
>>>>> DefaultFileNamePolicy understands to include windows.
>>>>>
>>>>> Today it replaces SSS with the shard, and NNN with the number of
>>>>>
>>>> shards
>>
>>> (so
>>>>
>>>>> many templates contain -SSS-of-NNN). We could also have it recognize
>>>>>
>>>> WWW
>>>
>>>> and PPP, for the window and the pane respectively.
>>>>>
>>>>> I believe this would be a backwards-compatible change. We do not need
>>>>>
>>>> to
>>>
>>>> change any existing interfaces, we would simply be allowing the
>>>>>
>>>> default
>>
>>> policy to work on windows.
>>>>>
>>>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com>
>>>>>
>>>> wrote:
>>>>
>>>>>
>>>>> +Eugene, Reuven who reviewed and implemented this code. They may have
>>>>>> opinions.
>>>>>>
>>>>>> Note that changing the default filename policy would be
>>>>>> backwards-incompatible, so this would either need to go into 2.0.0
>>>>>>
>>>>> (and
>>>
>>>> a
>>>>
>>>>> new RC3) or it would not go in.
>>>>>>
>>>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
>>>>>> borisha.zivkovic@gmail.com> wrote:
>>>>>>
>>>>>> great JB, thanks
>>>>>>>
>>>>>>> I do not mind working on this - let's see if anyone else has
>>>>>>>
>>>>>> additional
>>>
>>>> input.
>>>>>>>
>>>>>>> cheers
>>>>>>>
>>>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Got it.
>>>>>>>>
>>>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default
>>>>>>>>
>>>>>>> and
>>>
>>>> let
>>>>>>>
>>>>>>>> the
>>>>>>>> user provides its own policy if he wants to.
>>>>>>>>
>>>>>>>> Regards
>>>>>>>> JB
>>>>>>>>
>>>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
>>>>>>>>
>>>>>>>>> Hi  JB,
>>>>>>>>>
>>>>>>>>> yes I saw that thread - I also copied your code but did not want
>>>>>>>>>
>>>>>>>> to
>>>
>>>> pollute
>>>>>>>>
>>>>>>>>> it with my proposal :)
>>>>>>>>>
>>>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in
>>>>>>>>>
>>>>>>>> TextIO
>>>>>>>
>>>>>>>> does not make sense - not sure TBH...
>>>>>>>>>
>>>>>>>>> But would it make sense to promote a version of PerWindowFiles
>>>>>>>>>
>>>>>>>> from
>>>
>>>>
>>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>>>>>>>
>>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>>>>>>
>>>>>>>> so that it is easier to provide some kind of PerWindowFiles
>>>>>>>>>
>>>>>>>> filename
>>>>
>>>>> policy..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> something like (where user does not have to write
>>>>>>>>>
>>>>>>>> PerWindowFilesPolicy,
>>>>>>>
>>>>>>>> it
>>>>>>>>
>>>>>>>>> comes with Beam)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
>>>>>>>>> .withWindowedWrites()
>>>>>>>>> .withNumShards(1));
>>>>>>>>>
>>>>>>>>> not sure if this was already discussed...
>>>>>>>>>
>>>>>>>>> cheers
>>>>>>>>> Borisa
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
>>>>>>>>>
>>>>>>>> jb@nanthrax.net
>>>
>>>>
>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi Borisa,
>>>>>>>>>>
>>>>>>>>>> You can take a look about the other thread ("Direct runner
>>>>>>>>>>
>>>>>>>>> doesn't
>>>
>>>> seem
>>>>>>>
>>>>>>>> to
>>>>>>>>
>>>>>>>>> finalize checkpoint "quickly"").
>>>>>>>>>>
>>>>>>>>>> It's basically the same point ;)
>>>>>>>>>>
>>>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm
>>>>>>>>>>
>>>>>>>>> investigating
>>>>>>>>
>>>>>>>>> the
>>>>>>>>>> element timestamp and watermark.
>>>>>>>>>>
>>>>>>>>>> I'm also playing with that, for instance:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>>>>>>>
>>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>>>>>>
>>>>>>>>
>>>>>>>>>> When you use WindowedWrite, you have to provide a filename
>>>>>>>>>>
>>>>>>>>> policy. We
>>>>
>>>>> could
>>>>>>>>
>>>>>>>>> provide a default one, but not sure it will fit fine (as it
>>>>>>>>>>
>>>>>>>>> depends a
>>>>
>>>>> lot
>>>>>>>>
>>>>>>>>> about
>>>>>>>>>> the use cases).
>>>>>>>>>>
>>>>>>>>>> Regards
>>>>>>>>>> JB
>>>>>>>>>>
>>>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi guys,
>>>>>>>>>>>
>>>>>>>>>>> just playing with reading data from PubSub and writing using
>>>>>>>>>>>
>>>>>>>>>> TextIO.
>>>>
>>>>>
>>>>>>>>>>> First thing is that it is very hard to get any output - a lot
>>>>>>>>>>>
>>>>>>>>>> of
>>>
>>>> temp
>>>>>>>
>>>>>>>> files
>>>>>>>>>>
>>>>>>>>>>> written but not always would get final files created.
>>>>>>>>>>>
>>>>>>>>>>> So, I am playing with triggers etc... If I do following
>>>>>>>>>>>
>>>>>>>>>>> PCollection<String> streamData = p.apply(
>>>>>>>>>>>         PubsubIO.readStrings().fromTopic("projects/"+
>>>>>>>>>>>
>>>>>>>>>> PROJECT_NAME
>>>>
>>>>> +
>>>>>>>
>>>>>>>> "/topics/myTopic"));
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio
>>>>>>>>
>>>>>>> n.standardSeconds(5)))
>>>>>>>
>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>>>>>>>>
>>>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
>>>>>>>
>>>>>>>>             .withAllowedLateness(Duration.ZERO)
>>>>>>>>>>>             .discardingFiredPanes())
>>>>>>>>>>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
>>>>>>>>>>>     .withSuffix(".suff").withNumShards(10));
>>>>>>>>>>>
>>>>>>>>>>>     p.run();
>>>>>>>>>>>
>>>>>>>>>>> I would expect to see some files in /tmp/ with final results..
>>>>>>>>>>>
>>>>>>>>>> unless I
>>>>>>>
>>>>>>>> add
>>>>>>>>>>
>>>>>>>>>>> good triggers I usually do not get any data.. only temp files
>>>>>>>>>>>
>>>>>>>>>> in
>>>
>>>> /temp/.beam/
>>>>>>>>>>>
>>>>>>>>>>> but sometimes when data should be written I get following
>>>>>>>>>>>
>>>>>>>>>> exception
>>>>
>>>>>
>>>>>>>>>>> Exception in thread "main"
>>>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>>>>>>> java.lang.UnsupportedOperationException: There is no default
>>>>>>>>>>>
>>>>>>>>>> policy for
>>>>>>>
>>>>>>>> windowed file output. Please provide an explicit
>>>>>>>>>>>
>>>>>>>>>> FilenamePolicy
>>
>>> to
>>>>
>>>>> generate
>>>>>>>>>>
>>>>>>>>>>> filenames.
>>>>>>>>>>> at
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>>>>>>>
>>>>>>> sult.waitUntilFinish(DirectRunner.java:322)
>>>>>>>
>>>>>>>> at
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>>>>>>>
>>>>>>> sult.waitUntilFinish(DirectRunner.java:292)
>>>>>>>
>>>>>>>> at
>>>>>>>>>>>
>>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
>>>>>>>>
>>>>>>> DirectRunner.java:200)
>>>>
>>>>> at
>>>>>>>>>>>
>>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
>>>>>>>>
>>>>>>> DirectRunner.java:63)
>>>>
>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
>>>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
>>>>>>>>>>> at Test.main(Test.java:50)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Would it make sense to change TextIO so that it does not
>>>>>>>>>>> use DefaultFilenamePolicy only - but in case there are
>>>>>>>>>>>
>>>>>>>>>> windowedWrites
>>>>>>>
>>>>>>>> and
>>>>>>>>
>>>>>>>>> no filename policy was specified by user it could actually use
>>>>>>>>>>>
>>>>>>>>>> custom
>>>>>>>
>>>>>>>> FilePerWindow policy automatically. I believe today TextIO
>>>>>>>>>>>
>>>>>>>>>> always
>>>
>>>> expects
>>>>>>>>
>>>>>>>>> user to specify FilenamePolicy, right?
>>>>>>>>>>>
>>>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam
>>>>>>>>>>>
>>>>>>>>>> - I
>>>
>>>> believe
>>>>>>>>
>>>>>>>>> today there are only implementations in tests and examples but
>>>>>>>>>>>
>>>>>>>>>> nothing
>>>>>>>
>>>>>>>> publicly visible, right?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> thanks
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>> jbonofre@apache.org
>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>> --
>>>>>>>> Jean-Baptiste Onofré
>>>>>>>> jbonofre@apache.org
>>>>>>>> http://blog.nanthrax.net
>>>>>>>> Talend - http://www.talend.com
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Dan Halperin <dh...@google.com.INVALID>.
DefaultFilenamePolicy as currently written only accepts a single shard name
template. if that template is windowed, it won't work for unwindowed writes
(it will have -WWW-PPP or .WWW.PPP or something.WWW.something.PPPP?). And
vice versa on unwindowed templated for windowed writes.

There is likely a way to fix this more easily in the File IOs themselves.

On Fri, May 12, 2017 at 4:33 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Reuven,
>
> yes, it's what I have in mind: just provide the default logic in the
> existing method.
>
> Regards
> JB
>
>
> On 05/12/2017 12:46 PM, Reuven Lax wrote:
>
>> DefaultFilenamePolicy already contains a windowedFilename override (today
>> it throws an exception), so I don't think there's any need for a new
>> class.
>> We can simply fill out the existing method.
>>
>> On May 12, 2017 11:34 AM, "Borisa Zivkovic" <bo...@gmail.com>
>> wrote:
>>
>> +1 for DefaultFilenamePolicy being able to understand basic windowing...
>> probably the most
>> user-friendly way that would cover most of needs... in case of special
>> needs  users can provide their own policy..
>>
>> another alternative would be to have new class called
>> DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...
>>
>> any of those would make it easier for Beam users..
>>
>> so, someone needs to decide how we want to do this and if you want I can
>> work on it...
>>
>> cheers
>>
>> On Fri, 12 May 2017 at 08:18 Reuven Lax <re...@google.com.invalid> wrote:
>>
>> I believe that for most windows there is a standard stringification.
>>> However I think we could allow the user to inject a window formatter for
>>> cases where there is no good default (e.g. where the window is a
>>> complicated user-defined type, and toString() isn't good enough.
>>>
>>> Alternatively, if we don't want allow formatters,, we could make
>>> DefaultFilenamePolicy work with default stringifications of well-know
>>> windows (fixed, sliding, sessions, etc.), and just use toString() for
>>> remaining window types. Users that have weird custom window types can
>>> always right their own FilenamePolicy.
>>>
>>> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
>>> robertwb@google.com.invalid> wrote:
>>>
>>> I like the idea of WWW and PPP, assuming there is a standard enough
>>>> stringification of windows and panes. However, we may want to elide
>>>> adjacent tokes if the window is global or the pane is the only
>>>> possible (or first?) one to avoid writing things like
>>>> -0000-of-0005---.
>>>>
>>>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <re...@google.com.invalid>
>>>> wrote:
>>>>
>>>>> Another idea - we can extend the existing pattern that
>>>>> DefaultFileNamePolicy understands to include windows.
>>>>>
>>>>> Today it replaces SSS with the shard, and NNN with the number of
>>>>>
>>>> shards
>>
>>> (so
>>>>
>>>>> many templates contain -SSS-of-NNN). We could also have it recognize
>>>>>
>>>> WWW
>>>
>>>> and PPP, for the window and the pane respectively.
>>>>>
>>>>> I believe this would be a backwards-compatible change. We do not need
>>>>>
>>>> to
>>>
>>>> change any existing interfaces, we would simply be allowing the
>>>>>
>>>> default
>>
>>> policy to work on windows.
>>>>>
>>>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com>
>>>>>
>>>> wrote:
>>>>
>>>>>
>>>>> +Eugene, Reuven who reviewed and implemented this code. They may have
>>>>>> opinions.
>>>>>>
>>>>>> Note that changing the default filename policy would be
>>>>>> backwards-incompatible, so this would either need to go into 2.0.0
>>>>>>
>>>>> (and
>>>
>>>> a
>>>>
>>>>> new RC3) or it would not go in.
>>>>>>
>>>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
>>>>>> borisha.zivkovic@gmail.com> wrote:
>>>>>>
>>>>>> great JB, thanks
>>>>>>>
>>>>>>> I do not mind working on this - let's see if anyone else has
>>>>>>>
>>>>>> additional
>>>
>>>> input.
>>>>>>>
>>>>>>> cheers
>>>>>>>
>>>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Got it.
>>>>>>>>
>>>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default
>>>>>>>>
>>>>>>> and
>>>
>>>> let
>>>>>>>
>>>>>>>> the
>>>>>>>> user provides its own policy if he wants to.
>>>>>>>>
>>>>>>>> Regards
>>>>>>>> JB
>>>>>>>>
>>>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
>>>>>>>>
>>>>>>>>> Hi  JB,
>>>>>>>>>
>>>>>>>>> yes I saw that thread - I also copied your code but did not want
>>>>>>>>>
>>>>>>>> to
>>>
>>>> pollute
>>>>>>>>
>>>>>>>>> it with my proposal :)
>>>>>>>>>
>>>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in
>>>>>>>>>
>>>>>>>> TextIO
>>>>>>>
>>>>>>>> does not make sense - not sure TBH...
>>>>>>>>>
>>>>>>>>> But would it make sense to promote a version of PerWindowFiles
>>>>>>>>>
>>>>>>>> from
>>>
>>>>
>>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>>>>>>>
>>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>>>>>>
>>>>>>>> so that it is easier to provide some kind of PerWindowFiles
>>>>>>>>>
>>>>>>>> filename
>>>>
>>>>> policy..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> something like (where user does not have to write
>>>>>>>>>
>>>>>>>> PerWindowFilesPolicy,
>>>>>>>
>>>>>>>> it
>>>>>>>>
>>>>>>>>> comes with Beam)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
>>>>>>>>> .withWindowedWrites()
>>>>>>>>> .withNumShards(1));
>>>>>>>>>
>>>>>>>>> not sure if this was already discussed...
>>>>>>>>>
>>>>>>>>> cheers
>>>>>>>>> Borisa
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
>>>>>>>>>
>>>>>>>> jb@nanthrax.net
>>>
>>>>
>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi Borisa,
>>>>>>>>>>
>>>>>>>>>> You can take a look about the other thread ("Direct runner
>>>>>>>>>>
>>>>>>>>> doesn't
>>>
>>>> seem
>>>>>>>
>>>>>>>> to
>>>>>>>>
>>>>>>>>> finalize checkpoint "quickly"").
>>>>>>>>>>
>>>>>>>>>> It's basically the same point ;)
>>>>>>>>>>
>>>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm
>>>>>>>>>>
>>>>>>>>> investigating
>>>>>>>>
>>>>>>>>> the
>>>>>>>>>> element timestamp and watermark.
>>>>>>>>>>
>>>>>>>>>> I'm also playing with that, for instance:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>>>>>>>
>>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>>>>>>
>>>>>>>>
>>>>>>>>>> When you use WindowedWrite, you have to provide a filename
>>>>>>>>>>
>>>>>>>>> policy. We
>>>>
>>>>> could
>>>>>>>>
>>>>>>>>> provide a default one, but not sure it will fit fine (as it
>>>>>>>>>>
>>>>>>>>> depends a
>>>>
>>>>> lot
>>>>>>>>
>>>>>>>>> about
>>>>>>>>>> the use cases).
>>>>>>>>>>
>>>>>>>>>> Regards
>>>>>>>>>> JB
>>>>>>>>>>
>>>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi guys,
>>>>>>>>>>>
>>>>>>>>>>> just playing with reading data from PubSub and writing using
>>>>>>>>>>>
>>>>>>>>>> TextIO.
>>>>
>>>>>
>>>>>>>>>>> First thing is that it is very hard to get any output - a lot
>>>>>>>>>>>
>>>>>>>>>> of
>>>
>>>> temp
>>>>>>>
>>>>>>>> files
>>>>>>>>>>
>>>>>>>>>>> written but not always would get final files created.
>>>>>>>>>>>
>>>>>>>>>>> So, I am playing with triggers etc... If I do following
>>>>>>>>>>>
>>>>>>>>>>> PCollection<String> streamData = p.apply(
>>>>>>>>>>>         PubsubIO.readStrings().fromTopic("projects/"+
>>>>>>>>>>>
>>>>>>>>>> PROJECT_NAME
>>>>
>>>>> +
>>>>>>>
>>>>>>>> "/topics/myTopic"));
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio
>>>>>>>>
>>>>>>> n.standardSeconds(5)))
>>>>>>>
>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>>>>>>>>
>>>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
>>>>>>>
>>>>>>>>             .withAllowedLateness(Duration.ZERO)
>>>>>>>>>>>             .discardingFiredPanes())
>>>>>>>>>>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
>>>>>>>>>>>     .withSuffix(".suff").withNumShards(10));
>>>>>>>>>>>
>>>>>>>>>>>     p.run();
>>>>>>>>>>>
>>>>>>>>>>> I would expect to see some files in /tmp/ with final results..
>>>>>>>>>>>
>>>>>>>>>> unless I
>>>>>>>
>>>>>>>> add
>>>>>>>>>>
>>>>>>>>>>> good triggers I usually do not get any data.. only temp files
>>>>>>>>>>>
>>>>>>>>>> in
>>>
>>>> /temp/.beam/
>>>>>>>>>>>
>>>>>>>>>>> but sometimes when data should be written I get following
>>>>>>>>>>>
>>>>>>>>>> exception
>>>>
>>>>>
>>>>>>>>>>> Exception in thread "main"
>>>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>>>>>>> java.lang.UnsupportedOperationException: There is no default
>>>>>>>>>>>
>>>>>>>>>> policy for
>>>>>>>
>>>>>>>> windowed file output. Please provide an explicit
>>>>>>>>>>>
>>>>>>>>>> FilenamePolicy
>>
>>> to
>>>>
>>>>> generate
>>>>>>>>>>
>>>>>>>>>>> filenames.
>>>>>>>>>>> at
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>>>>>>>
>>>>>>> sult.waitUntilFinish(DirectRunner.java:322)
>>>>>>>
>>>>>>>> at
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>>>>>>>
>>>>>>> sult.waitUntilFinish(DirectRunner.java:292)
>>>>>>>
>>>>>>>> at
>>>>>>>>>>>
>>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
>>>>>>>>
>>>>>>> DirectRunner.java:200)
>>>>
>>>>> at
>>>>>>>>>>>
>>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
>>>>>>>>
>>>>>>> DirectRunner.java:63)
>>>>
>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
>>>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
>>>>>>>>>>> at Test.main(Test.java:50)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Would it make sense to change TextIO so that it does not
>>>>>>>>>>> use DefaultFilenamePolicy only - but in case there are
>>>>>>>>>>>
>>>>>>>>>> windowedWrites
>>>>>>>
>>>>>>>> and
>>>>>>>>
>>>>>>>>> no filename policy was specified by user it could actually use
>>>>>>>>>>>
>>>>>>>>>> custom
>>>>>>>
>>>>>>>> FilePerWindow policy automatically. I believe today TextIO
>>>>>>>>>>>
>>>>>>>>>> always
>>>
>>>> expects
>>>>>>>>
>>>>>>>>> user to specify FilenamePolicy, right?
>>>>>>>>>>>
>>>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam
>>>>>>>>>>>
>>>>>>>>>> - I
>>>
>>>> believe
>>>>>>>>
>>>>>>>>> today there are only implementations in tests and examples but
>>>>>>>>>>>
>>>>>>>>>> nothing
>>>>>>>
>>>>>>>> publicly visible, right?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> thanks
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>> jbonofre@apache.org
>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>> --
>>>>>>>> Jean-Baptiste Onofré
>>>>>>>> jbonofre@apache.org
>>>>>>>> http://blog.nanthrax.net
>>>>>>>> Talend - http://www.talend.com
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Reuven,

yes, it's what I have in mind: just provide the default logic in the existing 
method.

Regards
JB

On 05/12/2017 12:46 PM, Reuven Lax wrote:
> DefaultFilenamePolicy already contains a windowedFilename override (today
> it throws an exception), so I don't think there's any need for a new class.
> We can simply fill out the existing method.
>
> On May 12, 2017 11:34 AM, "Borisa Zivkovic" <bo...@gmail.com>
> wrote:
>
> +1 for DefaultFilenamePolicy being able to understand basic windowing...
> probably the most
> user-friendly way that would cover most of needs... in case of special
> needs  users can provide their own policy..
>
> another alternative would be to have new class called
> DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...
>
> any of those would make it easier for Beam users..
>
> so, someone needs to decide how we want to do this and if you want I can
> work on it...
>
> cheers
>
> On Fri, 12 May 2017 at 08:18 Reuven Lax <re...@google.com.invalid> wrote:
>
>> I believe that for most windows there is a standard stringification.
>> However I think we could allow the user to inject a window formatter for
>> cases where there is no good default (e.g. where the window is a
>> complicated user-defined type, and toString() isn't good enough.
>>
>> Alternatively, if we don't want allow formatters,, we could make
>> DefaultFilenamePolicy work with default stringifications of well-know
>> windows (fixed, sliding, sessions, etc.), and just use toString() for
>> remaining window types. Users that have weird custom window types can
>> always right their own FilenamePolicy.
>>
>> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
>> robertwb@google.com.invalid> wrote:
>>
>>> I like the idea of WWW and PPP, assuming there is a standard enough
>>> stringification of windows and panes. However, we may want to elide
>>> adjacent tokes if the window is global or the pane is the only
>>> possible (or first?) one to avoid writing things like
>>> -0000-of-0005---.
>>>
>>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <re...@google.com.invalid>
>>> wrote:
>>>> Another idea - we can extend the existing pattern that
>>>> DefaultFileNamePolicy understands to include windows.
>>>>
>>>> Today it replaces SSS with the shard, and NNN with the number of
> shards
>>> (so
>>>> many templates contain -SSS-of-NNN). We could also have it recognize
>> WWW
>>>> and PPP, for the window and the pane respectively.
>>>>
>>>> I believe this would be a backwards-compatible change. We do not need
>> to
>>>> change any existing interfaces, we would simply be allowing the
> default
>>>> policy to work on windows.
>>>>
>>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com>
>>> wrote:
>>>>
>>>>> +Eugene, Reuven who reviewed and implemented this code. They may have
>>>>> opinions.
>>>>>
>>>>> Note that changing the default filename policy would be
>>>>> backwards-incompatible, so this would either need to go into 2.0.0
>> (and
>>> a
>>>>> new RC3) or it would not go in.
>>>>>
>>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
>>>>> borisha.zivkovic@gmail.com> wrote:
>>>>>
>>>>>> great JB, thanks
>>>>>>
>>>>>> I do not mind working on this - let's see if anyone else has
>> additional
>>>>>> input.
>>>>>>
>>>>>> cheers
>>>>>>
>>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
>>>>>> wrote:
>>>>>>
>>>>>>> Got it.
>>>>>>>
>>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default
>> and
>>>>>> let
>>>>>>> the
>>>>>>> user provides its own policy if he wants to.
>>>>>>>
>>>>>>> Regards
>>>>>>> JB
>>>>>>>
>>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
>>>>>>>> Hi  JB,
>>>>>>>>
>>>>>>>> yes I saw that thread - I also copied your code but did not want
>> to
>>>>>>> pollute
>>>>>>>> it with my proposal :)
>>>>>>>>
>>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in
>>>>>> TextIO
>>>>>>>> does not make sense - not sure TBH...
>>>>>>>>
>>>>>>>> But would it make sense to promote a version of PerWindowFiles
>> from
>>>>>>>>
>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>>>>>>> so that it is easier to provide some kind of PerWindowFiles
>>> filename
>>>>>>>> policy..
>>>>>>>>
>>>>>>>>
>>>>>>>> something like (where user does not have to write
>>>>>> PerWindowFilesPolicy,
>>>>>>> it
>>>>>>>> comes with Beam)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
>>>>>>>> .withWindowedWrites()
>>>>>>>> .withNumShards(1));
>>>>>>>>
>>>>>>>> not sure if this was already discussed...
>>>>>>>>
>>>>>>>> cheers
>>>>>>>> Borisa
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
>> jb@nanthrax.net
>>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Borisa,
>>>>>>>>>
>>>>>>>>> You can take a look about the other thread ("Direct runner
>> doesn't
>>>>>> seem
>>>>>>> to
>>>>>>>>> finalize checkpoint "quickly"").
>>>>>>>>>
>>>>>>>>> It's basically the same point ;)
>>>>>>>>>
>>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm
>>>>>>> investigating
>>>>>>>>> the
>>>>>>>>> element timestamp and watermark.
>>>>>>>>>
>>>>>>>>> I'm also playing with that, for instance:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>>>>>>>>
>>>>>>>>> When you use WindowedWrite, you have to provide a filename
>>> policy. We
>>>>>>> could
>>>>>>>>> provide a default one, but not sure it will fit fine (as it
>>> depends a
>>>>>>> lot
>>>>>>>>> about
>>>>>>>>> the use cases).
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>> JB
>>>>>>>>>
>>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
>>>>>>>>>> Hi guys,
>>>>>>>>>>
>>>>>>>>>> just playing with reading data from PubSub and writing using
>>> TextIO.
>>>>>>>>>>
>>>>>>>>>> First thing is that it is very hard to get any output - a lot
>> of
>>>>>> temp
>>>>>>>>> files
>>>>>>>>>> written but not always would get final files created.
>>>>>>>>>>
>>>>>>>>>> So, I am playing with triggers etc... If I do following
>>>>>>>>>>
>>>>>>>>>> PCollection<String> streamData = p.apply(
>>>>>>>>>>         PubsubIO.readStrings().fromTopic("projects/"+
>>> PROJECT_NAME
>>>>>> +
>>>>>>>>>> "/topics/myTopic"));
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio
>>>>>> n.standardSeconds(5)))
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
>>>>>>>>>>             .withAllowedLateness(Duration.ZERO)
>>>>>>>>>>             .discardingFiredPanes())
>>>>>>>>>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
>>>>>>>>>>     .withSuffix(".suff").withNumShards(10));
>>>>>>>>>>
>>>>>>>>>>     p.run();
>>>>>>>>>>
>>>>>>>>>> I would expect to see some files in /tmp/ with final results..
>>>>>> unless I
>>>>>>>>> add
>>>>>>>>>> good triggers I usually do not get any data.. only temp files
>> in
>>>>>>>>>> /temp/.beam/
>>>>>>>>>>
>>>>>>>>>> but sometimes when data should be written I get following
>>> exception
>>>>>>>>>>
>>>>>>>>>> Exception in thread "main"
>>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>>>>>> java.lang.UnsupportedOperationException: There is no default
>>>>>> policy for
>>>>>>>>>> windowed file output. Please provide an explicit
> FilenamePolicy
>>> to
>>>>>>>>> generate
>>>>>>>>>> filenames.
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>>
>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>>>>> sult.waitUntilFinish(DirectRunner.java:322)
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>>
>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>>>>> sult.waitUntilFinish(DirectRunner.java:292)
>>>>>>>>>> at
>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
>>> DirectRunner.java:200)
>>>>>>>>>> at
>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
>>> DirectRunner.java:63)
>>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
>>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
>>>>>>>>>> at Test.main(Test.java:50)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Would it make sense to change TextIO so that it does not
>>>>>>>>>> use DefaultFilenamePolicy only - but in case there are
>>>>>> windowedWrites
>>>>>>> and
>>>>>>>>>> no filename policy was specified by user it could actually use
>>>>>> custom
>>>>>>>>>> FilePerWindow policy automatically. I believe today TextIO
>> always
>>>>>>> expects
>>>>>>>>>> user to specify FilenamePolicy, right?
>>>>>>>>>>
>>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam
>> - I
>>>>>>> believe
>>>>>>>>>> today there are only implementations in tests and examples but
>>>>>> nothing
>>>>>>>>>> publicly visible, right?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> thanks
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>> jbonofre@apache.org
>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jean-Baptiste Onofré
>>>>>>> jbonofre@apache.org
>>>>>>> http://blog.nanthrax.net
>>>>>>> Talend - http://www.talend.com
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Reuven Lax <re...@google.com.INVALID>.
DefaultFilenamePolicy already contains a windowedFilename override (today
it throws an exception), so I don't think there's any need for a new class.
We can simply fill out the existing method.

On May 12, 2017 11:34 AM, "Borisa Zivkovic" <bo...@gmail.com>
wrote:

+1 for DefaultFilenamePolicy being able to understand basic windowing...
probably the most
user-friendly way that would cover most of needs... in case of special
needs  users can provide their own policy..

another alternative would be to have new class called
DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...

any of those would make it easier for Beam users..

so, someone needs to decide how we want to do this and if you want I can
work on it...

cheers

On Fri, 12 May 2017 at 08:18 Reuven Lax <re...@google.com.invalid> wrote:

> I believe that for most windows there is a standard stringification.
> However I think we could allow the user to inject a window formatter for
> cases where there is no good default (e.g. where the window is a
> complicated user-defined type, and toString() isn't good enough.
>
> Alternatively, if we don't want allow formatters,, we could make
> DefaultFilenamePolicy work with default stringifications of well-know
> windows (fixed, sliding, sessions, etc.), and just use toString() for
> remaining window types. Users that have weird custom window types can
> always right their own FilenamePolicy.
>
> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
> robertwb@google.com.invalid> wrote:
>
> > I like the idea of WWW and PPP, assuming there is a standard enough
> > stringification of windows and panes. However, we may want to elide
> > adjacent tokes if the window is global or the pane is the only
> > possible (or first?) one to avoid writing things like
> > -0000-of-0005---.
> >
> > On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <re...@google.com.invalid>
> > wrote:
> > > Another idea - we can extend the existing pattern that
> > > DefaultFileNamePolicy understands to include windows.
> > >
> > > Today it replaces SSS with the shard, and NNN with the number of
shards
> > (so
> > > many templates contain -SSS-of-NNN). We could also have it recognize
> WWW
> > > and PPP, for the window and the pane respectively.
> > >
> > > I believe this would be a backwards-compatible change. We do not need
> to
> > > change any existing interfaces, we would simply be allowing the
default
> > > policy to work on windows.
> > >
> > > On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com>
> > wrote:
> > >
> > >> +Eugene, Reuven who reviewed and implemented this code. They may have
> > >> opinions.
> > >>
> > >> Note that changing the default filename policy would be
> > >> backwards-incompatible, so this would either need to go into 2.0.0
> (and
> > a
> > >> new RC3) or it would not go in.
> > >>
> > >> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> > >> borisha.zivkovic@gmail.com> wrote:
> > >>
> > >>> great JB, thanks
> > >>>
> > >>> I do not mind working on this - let's see if anyone else has
> additional
> > >>> input.
> > >>>
> > >>> cheers
> > >>>
> > >>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
> > >>> wrote:
> > >>>
> > >>> > Got it.
> > >>> >
> > >>> > Yes, agree, I think the PerWindowFilesPolicy could be the default
> and
> > >>> let
> > >>> > the
> > >>> > user provides its own policy if he wants to.
> > >>> >
> > >>> > Regards
> > >>> > JB
> > >>> >
> > >>> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> > >>> > > Hi  JB,
> > >>> > >
> > >>> > > yes I saw that thread - I also copied your code but did not want
> to
> > >>> > pollute
> > >>> > > it with my proposal :)
> > >>> > >
> > >>> > > Well ok maybe default FilePerWindow policy for windowedWrites in
> > >>> TextIO
> > >>> > > does not make sense - not sure TBH...
> > >>> > >
> > >>> > > But would it make sense to promote a version of PerWindowFiles
> from
> > >>> > >
> > >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
> > >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > >>> > > so that it is easier to provide some kind of PerWindowFiles
> > filename
> > >>> > > policy..
> > >>> > >
> > >>> > >
> > >>> > > something like (where user does not have to write
> > >>> PerWindowFilesPolicy,
> > >>> > it
> > >>> > > comes with Beam)
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> > >>> > > .withWindowedWrites()
> > >>> > > .withNumShards(1));
> > >>> > >
> > >>> > > not sure if this was already discussed...
> > >>> > >
> > >>> > > cheers
> > >>> > > Borisa
> > >>> > >
> > >>> > >
> > >>> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
> jb@nanthrax.net
> > >
> > >>> > wrote:
> > >>> > >
> > >>> > >> Hi Borisa,
> > >>> > >>
> > >>> > >> You can take a look about the other thread ("Direct runner
> doesn't
> > >>> seem
> > >>> > to
> > >>> > >> finalize checkpoint "quickly"").
> > >>> > >>
> > >>> > >> It's basically the same point ;)
> > >>> > >>
> > >>> > >> The default trigger (event-time) doesn't fire any data. I'm
> > >>> > investigating
> > >>> > >> the
> > >>> > >> element timestamp and watermark.
> > >>> > >>
> > >>> > >> I'm also playing with that, for instance:
> > >>> > >>
> > >>> > >>
> > >>> > >>
> > >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
> > >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > >>> > >>
> > >>> > >> When you use WindowedWrite, you have to provide a filename
> > policy. We
> > >>> > could
> > >>> > >> provide a default one, but not sure it will fit fine (as it
> > depends a
> > >>> > lot
> > >>> > >> about
> > >>> > >> the use cases).
> > >>> > >>
> > >>> > >> Regards
> > >>> > >> JB
> > >>> > >>
> > >>> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> > >>> > >>> Hi guys,
> > >>> > >>>
> > >>> > >>> just playing with reading data from PubSub and writing using
> > TextIO.
> > >>> > >>>
> > >>> > >>> First thing is that it is very hard to get any output - a lot
> of
> > >>> temp
> > >>> > >> files
> > >>> > >>> written but not always would get final files created.
> > >>> > >>>
> > >>> > >>> So, I am playing with triggers etc... If I do following
> > >>> > >>>
> > >>> > >>> PCollection<String> streamData = p.apply(
> > >>> > >>>         PubsubIO.readStrings().fromTopic("projects/"+
> > PROJECT_NAME
> > >>> +
> > >>> > >>> "/topics/myTopic"));
> > >>> > >>>
> > >>> > >>>
> > >>> > >>>
> > >>> > >>
> > >>> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio
> > >>> n.standardSeconds(5)))
> > >>> > >>>
> > >>> > >>>
> > >>> > >>
> > >>> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
> > >>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> > >>> > >>>             .withAllowedLateness(Duration.ZERO)
> > >>> > >>>             .discardingFiredPanes())
> > >>> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> > >>> > >>>     .withSuffix(".suff").withNumShards(10));
> > >>> > >>>
> > >>> > >>>     p.run();
> > >>> > >>>
> > >>> > >>> I would expect to see some files in /tmp/ with final results..
> > >>> unless I
> > >>> > >> add
> > >>> > >>> good triggers I usually do not get any data.. only temp files
> in
> > >>> > >>> /temp/.beam/
> > >>> > >>>
> > >>> > >>> but sometimes when data should be written I get following
> > exception
> > >>> > >>>
> > >>> > >>> Exception in thread "main"
> > >>> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> > >>> > >>> java.lang.UnsupportedOperationException: There is no default
> > >>> policy for
> > >>> > >>> windowed file output. Please provide an explicit
FilenamePolicy
> > to
> > >>> > >> generate
> > >>> > >>> filenames.
> > >>> > >>> at
> > >>> > >>>
> > >>> > >>
> > >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> > >>> sult.waitUntilFinish(DirectRunner.java:322)
> > >>> > >>> at
> > >>> > >>>
> > >>> > >>
> > >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> > >>> sult.waitUntilFinish(DirectRunner.java:292)
> > >>> > >>> at
> > >>> > org.apache.beam.runners.direct.DirectRunner.run(
> > DirectRunner.java:200)
> > >>> > >>> at
> > >>> > org.apache.beam.runners.direct.DirectRunner.run(
> > DirectRunner.java:63)
> > >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> > >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> > >>> > >>> at Test.main(Test.java:50)
> > >>> > >>>
> > >>> > >>>
> > >>> > >>> Would it make sense to change TextIO so that it does not
> > >>> > >>> use DefaultFilenamePolicy only - but in case there are
> > >>> windowedWrites
> > >>> > and
> > >>> > >>> no filename policy was specified by user it could actually use
> > >>> custom
> > >>> > >>> FilePerWindow policy automatically. I believe today TextIO
> always
> > >>> > expects
> > >>> > >>> user to specify FilenamePolicy, right?
> > >>> > >>>
> > >>> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam
> - I
> > >>> > believe
> > >>> > >>> today there are only implementations in tests and examples but
> > >>> nothing
> > >>> > >>> publicly visible, right?
> > >>> > >>>
> > >>> > >>>
> > >>> > >>>
> > >>> > >>> thanks
> > >>> > >>>
> > >>> > >>
> > >>> > >> --
> > >>> > >> Jean-Baptiste Onofré
> > >>> > >> jbonofre@apache.org
> > >>> > >> http://blog.nanthrax.net
> > >>> > >> Talend - http://www.talend.com
> > >>> > >>
> > >>> > >
> > >>> >
> > >>> > --
> > >>> > Jean-Baptiste Onofré
> > >>> > jbonofre@apache.org
> > >>> > http://blog.nanthrax.net
> > >>> > Talend - http://www.talend.com
> > >>> >
> > >>>
> > >>
> > >>
> >
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Borisa Zivkovic <bo...@gmail.com>.
+1 for DefaultFilenamePolicy being able to understand basic windowing...
probably the most
user-friendly way that would cover most of needs... in case of special
needs  users can provide their own policy..

another alternative would be to have new class called
DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...

any of those would make it easier for Beam users..

so, someone needs to decide how we want to do this and if you want I can
work on it...

cheers

On Fri, 12 May 2017 at 08:18 Reuven Lax <re...@google.com.invalid> wrote:

> I believe that for most windows there is a standard stringification.
> However I think we could allow the user to inject a window formatter for
> cases where there is no good default (e.g. where the window is a
> complicated user-defined type, and toString() isn't good enough.
>
> Alternatively, if we don't want allow formatters,, we could make
> DefaultFilenamePolicy work with default stringifications of well-know
> windows (fixed, sliding, sessions, etc.), and just use toString() for
> remaining window types. Users that have weird custom window types can
> always right their own FilenamePolicy.
>
> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
> robertwb@google.com.invalid> wrote:
>
> > I like the idea of WWW and PPP, assuming there is a standard enough
> > stringification of windows and panes. However, we may want to elide
> > adjacent tokes if the window is global or the pane is the only
> > possible (or first?) one to avoid writing things like
> > -0000-of-0005---.
> >
> > On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <re...@google.com.invalid>
> > wrote:
> > > Another idea - we can extend the existing pattern that
> > > DefaultFileNamePolicy understands to include windows.
> > >
> > > Today it replaces SSS with the shard, and NNN with the number of shards
> > (so
> > > many templates contain -SSS-of-NNN). We could also have it recognize
> WWW
> > > and PPP, for the window and the pane respectively.
> > >
> > > I believe this would be a backwards-compatible change. We do not need
> to
> > > change any existing interfaces, we would simply be allowing the default
> > > policy to work on windows.
> > >
> > > On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com>
> > wrote:
> > >
> > >> +Eugene, Reuven who reviewed and implemented this code. They may have
> > >> opinions.
> > >>
> > >> Note that changing the default filename policy would be
> > >> backwards-incompatible, so this would either need to go into 2.0.0
> (and
> > a
> > >> new RC3) or it would not go in.
> > >>
> > >> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> > >> borisha.zivkovic@gmail.com> wrote:
> > >>
> > >>> great JB, thanks
> > >>>
> > >>> I do not mind working on this - let's see if anyone else has
> additional
> > >>> input.
> > >>>
> > >>> cheers
> > >>>
> > >>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
> > >>> wrote:
> > >>>
> > >>> > Got it.
> > >>> >
> > >>> > Yes, agree, I think the PerWindowFilesPolicy could be the default
> and
> > >>> let
> > >>> > the
> > >>> > user provides its own policy if he wants to.
> > >>> >
> > >>> > Regards
> > >>> > JB
> > >>> >
> > >>> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> > >>> > > Hi  JB,
> > >>> > >
> > >>> > > yes I saw that thread - I also copied your code but did not want
> to
> > >>> > pollute
> > >>> > > it with my proposal :)
> > >>> > >
> > >>> > > Well ok maybe default FilePerWindow policy for windowedWrites in
> > >>> TextIO
> > >>> > > does not make sense - not sure TBH...
> > >>> > >
> > >>> > > But would it make sense to promote a version of PerWindowFiles
> from
> > >>> > >
> > >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
> > >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > >>> > > so that it is easier to provide some kind of PerWindowFiles
> > filename
> > >>> > > policy..
> > >>> > >
> > >>> > >
> > >>> > > something like (where user does not have to write
> > >>> PerWindowFilesPolicy,
> > >>> > it
> > >>> > > comes with Beam)
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> > >>> > > .withWindowedWrites()
> > >>> > > .withNumShards(1));
> > >>> > >
> > >>> > > not sure if this was already discussed...
> > >>> > >
> > >>> > > cheers
> > >>> > > Borisa
> > >>> > >
> > >>> > >
> > >>> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
> jb@nanthrax.net
> > >
> > >>> > wrote:
> > >>> > >
> > >>> > >> Hi Borisa,
> > >>> > >>
> > >>> > >> You can take a look about the other thread ("Direct runner
> doesn't
> > >>> seem
> > >>> > to
> > >>> > >> finalize checkpoint "quickly"").
> > >>> > >>
> > >>> > >> It's basically the same point ;)
> > >>> > >>
> > >>> > >> The default trigger (event-time) doesn't fire any data. I'm
> > >>> > investigating
> > >>> > >> the
> > >>> > >> element timestamp and watermark.
> > >>> > >>
> > >>> > >> I'm also playing with that, for instance:
> > >>> > >>
> > >>> > >>
> > >>> > >>
> > >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
> > >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > >>> > >>
> > >>> > >> When you use WindowedWrite, you have to provide a filename
> > policy. We
> > >>> > could
> > >>> > >> provide a default one, but not sure it will fit fine (as it
> > depends a
> > >>> > lot
> > >>> > >> about
> > >>> > >> the use cases).
> > >>> > >>
> > >>> > >> Regards
> > >>> > >> JB
> > >>> > >>
> > >>> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> > >>> > >>> Hi guys,
> > >>> > >>>
> > >>> > >>> just playing with reading data from PubSub and writing using
> > TextIO.
> > >>> > >>>
> > >>> > >>> First thing is that it is very hard to get any output - a lot
> of
> > >>> temp
> > >>> > >> files
> > >>> > >>> written but not always would get final files created.
> > >>> > >>>
> > >>> > >>> So, I am playing with triggers etc... If I do following
> > >>> > >>>
> > >>> > >>> PCollection<String> streamData = p.apply(
> > >>> > >>>         PubsubIO.readStrings().fromTopic("projects/"+
> > PROJECT_NAME
> > >>> +
> > >>> > >>> "/topics/myTopic"));
> > >>> > >>>
> > >>> > >>>
> > >>> > >>>
> > >>> > >>
> > >>> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio
> > >>> n.standardSeconds(5)))
> > >>> > >>>
> > >>> > >>>
> > >>> > >>
> > >>> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
> > >>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> > >>> > >>>             .withAllowedLateness(Duration.ZERO)
> > >>> > >>>             .discardingFiredPanes())
> > >>> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> > >>> > >>>     .withSuffix(".suff").withNumShards(10));
> > >>> > >>>
> > >>> > >>>     p.run();
> > >>> > >>>
> > >>> > >>> I would expect to see some files in /tmp/ with final results..
> > >>> unless I
> > >>> > >> add
> > >>> > >>> good triggers I usually do not get any data.. only temp files
> in
> > >>> > >>> /temp/.beam/
> > >>> > >>>
> > >>> > >>> but sometimes when data should be written I get following
> > exception
> > >>> > >>>
> > >>> > >>> Exception in thread "main"
> > >>> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> > >>> > >>> java.lang.UnsupportedOperationException: There is no default
> > >>> policy for
> > >>> > >>> windowed file output. Please provide an explicit FilenamePolicy
> > to
> > >>> > >> generate
> > >>> > >>> filenames.
> > >>> > >>> at
> > >>> > >>>
> > >>> > >>
> > >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> > >>> sult.waitUntilFinish(DirectRunner.java:322)
> > >>> > >>> at
> > >>> > >>>
> > >>> > >>
> > >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> > >>> sult.waitUntilFinish(DirectRunner.java:292)
> > >>> > >>> at
> > >>> > org.apache.beam.runners.direct.DirectRunner.run(
> > DirectRunner.java:200)
> > >>> > >>> at
> > >>> > org.apache.beam.runners.direct.DirectRunner.run(
> > DirectRunner.java:63)
> > >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> > >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> > >>> > >>> at Test.main(Test.java:50)
> > >>> > >>>
> > >>> > >>>
> > >>> > >>> Would it make sense to change TextIO so that it does not
> > >>> > >>> use DefaultFilenamePolicy only - but in case there are
> > >>> windowedWrites
> > >>> > and
> > >>> > >>> no filename policy was specified by user it could actually use
> > >>> custom
> > >>> > >>> FilePerWindow policy automatically. I believe today TextIO
> always
> > >>> > expects
> > >>> > >>> user to specify FilenamePolicy, right?
> > >>> > >>>
> > >>> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam
> - I
> > >>> > believe
> > >>> > >>> today there are only implementations in tests and examples but
> > >>> nothing
> > >>> > >>> publicly visible, right?
> > >>> > >>>
> > >>> > >>>
> > >>> > >>>
> > >>> > >>> thanks
> > >>> > >>>
> > >>> > >>
> > >>> > >> --
> > >>> > >> Jean-Baptiste Onofré
> > >>> > >> jbonofre@apache.org
> > >>> > >> http://blog.nanthrax.net
> > >>> > >> Talend - http://www.talend.com
> > >>> > >>
> > >>> > >
> > >>> >
> > >>> > --
> > >>> > Jean-Baptiste Onofré
> > >>> > jbonofre@apache.org
> > >>> > http://blog.nanthrax.net
> > >>> > Talend - http://www.talend.com
> > >>> >
> > >>>
> > >>
> > >>
> >
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Reuven Lax <re...@google.com.INVALID>.
I believe that for most windows there is a standard stringification.
However I think we could allow the user to inject a window formatter for
cases where there is no good default (e.g. where the window is a
complicated user-defined type, and toString() isn't good enough.

Alternatively, if we don't want allow formatters,, we could make
DefaultFilenamePolicy work with default stringifications of well-know
windows (fixed, sliding, sessions, etc.), and just use toString() for
remaining window types. Users that have weird custom window types can
always right their own FilenamePolicy.

On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
robertwb@google.com.invalid> wrote:

> I like the idea of WWW and PPP, assuming there is a standard enough
> stringification of windows and panes. However, we may want to elide
> adjacent tokes if the window is global or the pane is the only
> possible (or first?) one to avoid writing things like
> -0000-of-0005---.
>
> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <re...@google.com.invalid>
> wrote:
> > Another idea - we can extend the existing pattern that
> > DefaultFileNamePolicy understands to include windows.
> >
> > Today it replaces SSS with the shard, and NNN with the number of shards
> (so
> > many templates contain -SSS-of-NNN). We could also have it recognize WWW
> > and PPP, for the window and the pane respectively.
> >
> > I believe this would be a backwards-compatible change. We do not need to
> > change any existing interfaces, we would simply be allowing the default
> > policy to work on windows.
> >
> > On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com>
> wrote:
> >
> >> +Eugene, Reuven who reviewed and implemented this code. They may have
> >> opinions.
> >>
> >> Note that changing the default filename policy would be
> >> backwards-incompatible, so this would either need to go into 2.0.0 (and
> a
> >> new RC3) or it would not go in.
> >>
> >> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> >> borisha.zivkovic@gmail.com> wrote:
> >>
> >>> great JB, thanks
> >>>
> >>> I do not mind working on this - let's see if anyone else has additional
> >>> input.
> >>>
> >>> cheers
> >>>
> >>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
> >>> wrote:
> >>>
> >>> > Got it.
> >>> >
> >>> > Yes, agree, I think the PerWindowFilesPolicy could be the default and
> >>> let
> >>> > the
> >>> > user provides its own policy if he wants to.
> >>> >
> >>> > Regards
> >>> > JB
> >>> >
> >>> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> >>> > > Hi  JB,
> >>> > >
> >>> > > yes I saw that thread - I also copied your code but did not want to
> >>> > pollute
> >>> > > it with my proposal :)
> >>> > >
> >>> > > Well ok maybe default FilePerWindow policy for windowedWrites in
> >>> TextIO
> >>> > > does not make sense - not sure TBH...
> >>> > >
> >>> > > But would it make sense to promote a version of PerWindowFiles from
> >>> > >
> >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>> > > so that it is easier to provide some kind of PerWindowFiles
> filename
> >>> > > policy..
> >>> > >
> >>> > >
> >>> > > something like (where user does not have to write
> >>> PerWindowFilesPolicy,
> >>> > it
> >>> > > comes with Beam)
> >>> > >
> >>> > >
> >>> > >
> >>> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> >>> > > .withWindowedWrites()
> >>> > > .withNumShards(1));
> >>> > >
> >>> > > not sure if this was already discussed...
> >>> > >
> >>> > > cheers
> >>> > > Borisa
> >>> > >
> >>> > >
> >>> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <jb@nanthrax.net
> >
> >>> > wrote:
> >>> > >
> >>> > >> Hi Borisa,
> >>> > >>
> >>> > >> You can take a look about the other thread ("Direct runner doesn't
> >>> seem
> >>> > to
> >>> > >> finalize checkpoint "quickly"").
> >>> > >>
> >>> > >> It's basically the same point ;)
> >>> > >>
> >>> > >> The default trigger (event-time) doesn't fire any data. I'm
> >>> > investigating
> >>> > >> the
> >>> > >> element timestamp and watermark.
> >>> > >>
> >>> > >> I'm also playing with that, for instance:
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>> > >>
> >>> > >> When you use WindowedWrite, you have to provide a filename
> policy. We
> >>> > could
> >>> > >> provide a default one, but not sure it will fit fine (as it
> depends a
> >>> > lot
> >>> > >> about
> >>> > >> the use cases).
> >>> > >>
> >>> > >> Regards
> >>> > >> JB
> >>> > >>
> >>> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> >>> > >>> Hi guys,
> >>> > >>>
> >>> > >>> just playing with reading data from PubSub and writing using
> TextIO.
> >>> > >>>
> >>> > >>> First thing is that it is very hard to get any output - a lot of
> >>> temp
> >>> > >> files
> >>> > >>> written but not always would get final files created.
> >>> > >>>
> >>> > >>> So, I am playing with triggers etc... If I do following
> >>> > >>>
> >>> > >>> PCollection<String> streamData = p.apply(
> >>> > >>>         PubsubIO.readStrings().fromTopic("projects/"+
> PROJECT_NAME
> >>> +
> >>> > >>> "/topics/myTopic"));
> >>> > >>>
> >>> > >>>
> >>> > >>>
> >>> > >>
> >>> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio
> >>> n.standardSeconds(5)))
> >>> > >>>
> >>> > >>>
> >>> > >>
> >>> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
> >>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> >>> > >>>             .withAllowedLateness(Duration.ZERO)
> >>> > >>>             .discardingFiredPanes())
> >>> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> >>> > >>>     .withSuffix(".suff").withNumShards(10));
> >>> > >>>
> >>> > >>>     p.run();
> >>> > >>>
> >>> > >>> I would expect to see some files in /tmp/ with final results..
> >>> unless I
> >>> > >> add
> >>> > >>> good triggers I usually do not get any data.. only temp files in
> >>> > >>> /temp/.beam/
> >>> > >>>
> >>> > >>> but sometimes when data should be written I get following
> exception
> >>> > >>>
> >>> > >>> Exception in thread "main"
> >>> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >>> > >>> java.lang.UnsupportedOperationException: There is no default
> >>> policy for
> >>> > >>> windowed file output. Please provide an explicit FilenamePolicy
> to
> >>> > >> generate
> >>> > >>> filenames.
> >>> > >>> at
> >>> > >>>
> >>> > >>
> >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>> sult.waitUntilFinish(DirectRunner.java:322)
> >>> > >>> at
> >>> > >>>
> >>> > >>
> >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>> sult.waitUntilFinish(DirectRunner.java:292)
> >>> > >>> at
> >>> > org.apache.beam.runners.direct.DirectRunner.run(
> DirectRunner.java:200)
> >>> > >>> at
> >>> > org.apache.beam.runners.direct.DirectRunner.run(
> DirectRunner.java:63)
> >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> >>> > >>> at Test.main(Test.java:50)
> >>> > >>>
> >>> > >>>
> >>> > >>> Would it make sense to change TextIO so that it does not
> >>> > >>> use DefaultFilenamePolicy only - but in case there are
> >>> windowedWrites
> >>> > and
> >>> > >>> no filename policy was specified by user it could actually use
> >>> custom
> >>> > >>> FilePerWindow policy automatically. I believe today TextIO always
> >>> > expects
> >>> > >>> user to specify FilenamePolicy, right?
> >>> > >>>
> >>> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I
> >>> > believe
> >>> > >>> today there are only implementations in tests and examples but
> >>> nothing
> >>> > >>> publicly visible, right?
> >>> > >>>
> >>> > >>>
> >>> > >>>
> >>> > >>> thanks
> >>> > >>>
> >>> > >>
> >>> > >> --
> >>> > >> Jean-Baptiste Onofré
> >>> > >> jbonofre@apache.org
> >>> > >> http://blog.nanthrax.net
> >>> > >> Talend - http://www.talend.com
> >>> > >>
> >>> > >
> >>> >
> >>> > --
> >>> > Jean-Baptiste Onofré
> >>> > jbonofre@apache.org
> >>> > http://blog.nanthrax.net
> >>> > Talend - http://www.talend.com
> >>> >
> >>>
> >>
> >>
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
Windows are a user-defined type, though there aren't too many occasions for
wildly interesting ones. In other places where we need to key by them
(state namespaces) the solution is not readable, as it goes through the
coder and then injects that result into the allowed keyspace (e.g. via
base64 encoding).

Since every window is a maximum timestamp along with whatever other
user-defined stuff, prefixing with the ISO format of the max timestamp and
suffixing with the unreadable full encoding might work.

Otherwise, new APIs needed I think.

+1 that the transform should own validating its naming policy against the
input's windowing strategy

On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
robertwb@google.com.invalid> wrote:

> I like the idea of WWW and PPP, assuming there is a standard enough
> stringification of windows and panes. However, we may want to elide
> adjacent tokes if the window is global or the pane is the only
> possible (or first?) one to avoid writing things like
> -0000-of-0005---.
>
> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <re...@google.com.invalid>
> wrote:
> > Another idea - we can extend the existing pattern that
> > DefaultFileNamePolicy understands to include windows.
> >
> > Today it replaces SSS with the shard, and NNN with the number of shards
> (so
> > many templates contain -SSS-of-NNN). We could also have it recognize WWW
> > and PPP, for the window and the pane respectively.
> >
> > I believe this would be a backwards-compatible change. We do not need to
> > change any existing interfaces, we would simply be allowing the default
> > policy to work on windows.
> >
> > On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com>
> wrote:
> >
> >> +Eugene, Reuven who reviewed and implemented this code. They may have
> >> opinions.
> >>
> >> Note that changing the default filename policy would be
> >> backwards-incompatible, so this would either need to go into 2.0.0 (and
> a
> >> new RC3) or it would not go in.
> >>
> >> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> >> borisha.zivkovic@gmail.com> wrote:
> >>
> >>> great JB, thanks
> >>>
> >>> I do not mind working on this - let's see if anyone else has additional
> >>> input.
> >>>
> >>> cheers
> >>>
> >>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
> >>> wrote:
> >>>
> >>> > Got it.
> >>> >
> >>> > Yes, agree, I think the PerWindowFilesPolicy could be the default and
> >>> let
> >>> > the
> >>> > user provides its own policy if he wants to.
> >>> >
> >>> > Regards
> >>> > JB
> >>> >
> >>> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> >>> > > Hi  JB,
> >>> > >
> >>> > > yes I saw that thread - I also copied your code but did not want to
> >>> > pollute
> >>> > > it with my proposal :)
> >>> > >
> >>> > > Well ok maybe default FilePerWindow policy for windowedWrites in
> >>> TextIO
> >>> > > does not make sense - not sure TBH...
> >>> > >
> >>> > > But would it make sense to promote a version of PerWindowFiles from
> >>> > >
> >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>> > > so that it is easier to provide some kind of PerWindowFiles
> filename
> >>> > > policy..
> >>> > >
> >>> > >
> >>> > > something like (where user does not have to write
> >>> PerWindowFilesPolicy,
> >>> > it
> >>> > > comes with Beam)
> >>> > >
> >>> > >
> >>> > >
> >>> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> >>> > > .withWindowedWrites()
> >>> > > .withNumShards(1));
> >>> > >
> >>> > > not sure if this was already discussed...
> >>> > >
> >>> > > cheers
> >>> > > Borisa
> >>> > >
> >>> > >
> >>> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <jb@nanthrax.net
> >
> >>> > wrote:
> >>> > >
> >>> > >> Hi Borisa,
> >>> > >>
> >>> > >> You can take a look about the other thread ("Direct runner doesn't
> >>> seem
> >>> > to
> >>> > >> finalize checkpoint "quickly"").
> >>> > >>
> >>> > >> It's basically the same point ;)
> >>> > >>
> >>> > >> The default trigger (event-time) doesn't fire any data. I'm
> >>> > investigating
> >>> > >> the
> >>> > >> element timestamp and watermark.
> >>> > >>
> >>> > >> I'm also playing with that, for instance:
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>> > >>
> >>> > >> When you use WindowedWrite, you have to provide a filename
> policy. We
> >>> > could
> >>> > >> provide a default one, but not sure it will fit fine (as it
> depends a
> >>> > lot
> >>> > >> about
> >>> > >> the use cases).
> >>> > >>
> >>> > >> Regards
> >>> > >> JB
> >>> > >>
> >>> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> >>> > >>> Hi guys,
> >>> > >>>
> >>> > >>> just playing with reading data from PubSub and writing using
> TextIO.
> >>> > >>>
> >>> > >>> First thing is that it is very hard to get any output - a lot of
> >>> temp
> >>> > >> files
> >>> > >>> written but not always would get final files created.
> >>> > >>>
> >>> > >>> So, I am playing with triggers etc... If I do following
> >>> > >>>
> >>> > >>> PCollection<String> streamData = p.apply(
> >>> > >>>         PubsubIO.readStrings().fromTopic("projects/"+
> PROJECT_NAME
> >>> +
> >>> > >>> "/topics/myTopic"));
> >>> > >>>
> >>> > >>>
> >>> > >>>
> >>> > >>
> >>> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio
> >>> n.standardSeconds(5)))
> >>> > >>>
> >>> > >>>
> >>> > >>
> >>> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
> >>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> >>> > >>>             .withAllowedLateness(Duration.ZERO)
> >>> > >>>             .discardingFiredPanes())
> >>> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> >>> > >>>     .withSuffix(".suff").withNumShards(10));
> >>> > >>>
> >>> > >>>     p.run();
> >>> > >>>
> >>> > >>> I would expect to see some files in /tmp/ with final results..
> >>> unless I
> >>> > >> add
> >>> > >>> good triggers I usually do not get any data.. only temp files in
> >>> > >>> /temp/.beam/
> >>> > >>>
> >>> > >>> but sometimes when data should be written I get following
> exception
> >>> > >>>
> >>> > >>> Exception in thread "main"
> >>> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >>> > >>> java.lang.UnsupportedOperationException: There is no default
> >>> policy for
> >>> > >>> windowed file output. Please provide an explicit FilenamePolicy
> to
> >>> > >> generate
> >>> > >>> filenames.
> >>> > >>> at
> >>> > >>>
> >>> > >>
> >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>> sult.waitUntilFinish(DirectRunner.java:322)
> >>> > >>> at
> >>> > >>>
> >>> > >>
> >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>> sult.waitUntilFinish(DirectRunner.java:292)
> >>> > >>> at
> >>> > org.apache.beam.runners.direct.DirectRunner.run(
> DirectRunner.java:200)
> >>> > >>> at
> >>> > org.apache.beam.runners.direct.DirectRunner.run(
> DirectRunner.java:63)
> >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> >>> > >>> at Test.main(Test.java:50)
> >>> > >>>
> >>> > >>>
> >>> > >>> Would it make sense to change TextIO so that it does not
> >>> > >>> use DefaultFilenamePolicy only - but in case there are
> >>> windowedWrites
> >>> > and
> >>> > >>> no filename policy was specified by user it could actually use
> >>> custom
> >>> > >>> FilePerWindow policy automatically. I believe today TextIO always
> >>> > expects
> >>> > >>> user to specify FilenamePolicy, right?
> >>> > >>>
> >>> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I
> >>> > believe
> >>> > >>> today there are only implementations in tests and examples but
> >>> nothing
> >>> > >>> publicly visible, right?
> >>> > >>>
> >>> > >>>
> >>> > >>>
> >>> > >>> thanks
> >>> > >>>
> >>> > >>
> >>> > >> --
> >>> > >> Jean-Baptiste Onofré
> >>> > >> jbonofre@apache.org
> >>> > >> http://blog.nanthrax.net
> >>> > >> Talend - http://www.talend.com
> >>> > >>
> >>> > >
> >>> >
> >>> > --
> >>> > Jean-Baptiste Onofré
> >>> > jbonofre@apache.org
> >>> > http://blog.nanthrax.net
> >>> > Talend - http://www.talend.com
> >>> >
> >>>
> >>
> >>
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Robert Bradshaw <ro...@google.com.INVALID>.
I like the idea of WWW and PPP, assuming there is a standard enough
stringification of windows and panes. However, we may want to elide
adjacent tokes if the window is global or the pane is the only
possible (or first?) one to avoid writing things like
-0000-of-0005---.

On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <re...@google.com.invalid> wrote:
> Another idea - we can extend the existing pattern that
> DefaultFileNamePolicy understands to include windows.
>
> Today it replaces SSS with the shard, and NNN with the number of shards (so
> many templates contain -SSS-of-NNN). We could also have it recognize WWW
> and PPP, for the window and the pane respectively.
>
> I believe this would be a backwards-compatible change. We do not need to
> change any existing interfaces, we would simply be allowing the default
> policy to work on windows.
>
> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com> wrote:
>
>> +Eugene, Reuven who reviewed and implemented this code. They may have
>> opinions.
>>
>> Note that changing the default filename policy would be
>> backwards-incompatible, so this would either need to go into 2.0.0 (and a
>> new RC3) or it would not go in.
>>
>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
>> borisha.zivkovic@gmail.com> wrote:
>>
>>> great JB, thanks
>>>
>>> I do not mind working on this - let's see if anyone else has additional
>>> input.
>>>
>>> cheers
>>>
>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
>>> wrote:
>>>
>>> > Got it.
>>> >
>>> > Yes, agree, I think the PerWindowFilesPolicy could be the default and
>>> let
>>> > the
>>> > user provides its own policy if he wants to.
>>> >
>>> > Regards
>>> > JB
>>> >
>>> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
>>> > > Hi  JB,
>>> > >
>>> > > yes I saw that thread - I also copied your code but did not want to
>>> > pollute
>>> > > it with my proposal :)
>>> > >
>>> > > Well ok maybe default FilePerWindow policy for windowedWrites in
>>> TextIO
>>> > > does not make sense - not sure TBH...
>>> > >
>>> > > But would it make sense to promote a version of PerWindowFiles from
>>> > >
>>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>> > > so that it is easier to provide some kind of PerWindowFiles filename
>>> > > policy..
>>> > >
>>> > >
>>> > > something like (where user does not have to write
>>> PerWindowFilesPolicy,
>>> > it
>>> > > comes with Beam)
>>> > >
>>> > >
>>> > >
>>> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
>>> > > .withWindowedWrites()
>>> > > .withNumShards(1));
>>> > >
>>> > > not sure if this was already discussed...
>>> > >
>>> > > cheers
>>> > > Borisa
>>> > >
>>> > >
>>> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <jb...@nanthrax.net>
>>> > wrote:
>>> > >
>>> > >> Hi Borisa,
>>> > >>
>>> > >> You can take a look about the other thread ("Direct runner doesn't
>>> seem
>>> > to
>>> > >> finalize checkpoint "quickly"").
>>> > >>
>>> > >> It's basically the same point ;)
>>> > >>
>>> > >> The default trigger (event-time) doesn't fire any data. I'm
>>> > investigating
>>> > >> the
>>> > >> element timestamp and watermark.
>>> > >>
>>> > >> I'm also playing with that, for instance:
>>> > >>
>>> > >>
>>> > >>
>>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>> > >>
>>> > >> When you use WindowedWrite, you have to provide a filename policy. We
>>> > could
>>> > >> provide a default one, but not sure it will fit fine (as it depends a
>>> > lot
>>> > >> about
>>> > >> the use cases).
>>> > >>
>>> > >> Regards
>>> > >> JB
>>> > >>
>>> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
>>> > >>> Hi guys,
>>> > >>>
>>> > >>> just playing with reading data from PubSub and writing using TextIO.
>>> > >>>
>>> > >>> First thing is that it is very hard to get any output - a lot of
>>> temp
>>> > >> files
>>> > >>> written but not always would get final files created.
>>> > >>>
>>> > >>> So, I am playing with triggers etc... If I do following
>>> > >>>
>>> > >>> PCollection<String> streamData = p.apply(
>>> > >>>         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME
>>> +
>>> > >>> "/topics/myTopic"));
>>> > >>>
>>> > >>>
>>> > >>>
>>> > >>
>>> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio
>>> n.standardSeconds(5)))
>>> > >>>
>>> > >>>
>>> > >>
>>> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
>>> > >>>             .withAllowedLateness(Duration.ZERO)
>>> > >>>             .discardingFiredPanes())
>>> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
>>> > >>>     .withSuffix(".suff").withNumShards(10));
>>> > >>>
>>> > >>>     p.run();
>>> > >>>
>>> > >>> I would expect to see some files in /tmp/ with final results..
>>> unless I
>>> > >> add
>>> > >>> good triggers I usually do not get any data.. only temp files in
>>> > >>> /temp/.beam/
>>> > >>>
>>> > >>> but sometimes when data should be written I get following exception
>>> > >>>
>>> > >>> Exception in thread "main"
>>> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> > >>> java.lang.UnsupportedOperationException: There is no default
>>> policy for
>>> > >>> windowed file output. Please provide an explicit FilenamePolicy to
>>> > >> generate
>>> > >>> filenames.
>>> > >>> at
>>> > >>>
>>> > >>
>>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>> sult.waitUntilFinish(DirectRunner.java:322)
>>> > >>> at
>>> > >>>
>>> > >>
>>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>> sult.waitUntilFinish(DirectRunner.java:292)
>>> > >>> at
>>> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
>>> > >>> at
>>> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
>>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
>>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
>>> > >>> at Test.main(Test.java:50)
>>> > >>>
>>> > >>>
>>> > >>> Would it make sense to change TextIO so that it does not
>>> > >>> use DefaultFilenamePolicy only - but in case there are
>>> windowedWrites
>>> > and
>>> > >>> no filename policy was specified by user it could actually use
>>> custom
>>> > >>> FilePerWindow policy automatically. I believe today TextIO always
>>> > expects
>>> > >>> user to specify FilenamePolicy, right?
>>> > >>>
>>> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I
>>> > believe
>>> > >>> today there are only implementations in tests and examples but
>>> nothing
>>> > >>> publicly visible, right?
>>> > >>>
>>> > >>>
>>> > >>>
>>> > >>> thanks
>>> > >>>
>>> > >>
>>> > >> --
>>> > >> Jean-Baptiste Onofré
>>> > >> jbonofre@apache.org
>>> > >> http://blog.nanthrax.net
>>> > >> Talend - http://www.talend.com
>>> > >>
>>> > >
>>> >
>>> > --
>>> > Jean-Baptiste Onofré
>>> > jbonofre@apache.org
>>> > http://blog.nanthrax.net
>>> > Talend - http://www.talend.com
>>> >
>>>
>>
>>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Reuven Lax <re...@google.com.INVALID>.
Another idea - we can extend the existing pattern that
DefaultFileNamePolicy understands to include windows.

Today it replaces SSS with the shard, and NNN with the number of shards (so
many templates contain -SSS-of-NNN). We could also have it recognize WWW
and PPP, for the window and the pane respectively.

I believe this would be a backwards-compatible change. We do not need to
change any existing interfaces, we would simply be allowing the default
policy to work on windows.

On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <dh...@google.com> wrote:

> +Eugene, Reuven who reviewed and implemented this code. They may have
> opinions.
>
> Note that changing the default filename policy would be
> backwards-incompatible, so this would either need to go into 2.0.0 (and a
> new RC3) or it would not go in.
>
> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> borisha.zivkovic@gmail.com> wrote:
>
>> great JB, thanks
>>
>> I do not mind working on this - let's see if anyone else has additional
>> input.
>>
>> cheers
>>
>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>> > Got it.
>> >
>> > Yes, agree, I think the PerWindowFilesPolicy could be the default and
>> let
>> > the
>> > user provides its own policy if he wants to.
>> >
>> > Regards
>> > JB
>> >
>> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
>> > > Hi  JB,
>> > >
>> > > yes I saw that thread - I also copied your code but did not want to
>> > pollute
>> > > it with my proposal :)
>> > >
>> > > Well ok maybe default FilePerWindow policy for windowedWrites in
>> TextIO
>> > > does not make sense - not sure TBH...
>> > >
>> > > But would it make sense to promote a version of PerWindowFiles from
>> > >
>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>> > > so that it is easier to provide some kind of PerWindowFiles filename
>> > > policy..
>> > >
>> > >
>> > > something like (where user does not have to write
>> PerWindowFilesPolicy,
>> > it
>> > > comes with Beam)
>> > >
>> > >
>> > >
>> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
>> > > .withWindowedWrites()
>> > > .withNumShards(1));
>> > >
>> > > not sure if this was already discussed...
>> > >
>> > > cheers
>> > > Borisa
>> > >
>> > >
>> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <jb...@nanthrax.net>
>> > wrote:
>> > >
>> > >> Hi Borisa,
>> > >>
>> > >> You can take a look about the other thread ("Direct runner doesn't
>> seem
>> > to
>> > >> finalize checkpoint "quickly"").
>> > >>
>> > >> It's basically the same point ;)
>> > >>
>> > >> The default trigger (event-time) doesn't fire any data. I'm
>> > investigating
>> > >> the
>> > >> element timestamp and watermark.
>> > >>
>> > >> I'm also playing with that, for instance:
>> > >>
>> > >>
>> > >>
>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>> > >>
>> > >> When you use WindowedWrite, you have to provide a filename policy. We
>> > could
>> > >> provide a default one, but not sure it will fit fine (as it depends a
>> > lot
>> > >> about
>> > >> the use cases).
>> > >>
>> > >> Regards
>> > >> JB
>> > >>
>> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
>> > >>> Hi guys,
>> > >>>
>> > >>> just playing with reading data from PubSub and writing using TextIO.
>> > >>>
>> > >>> First thing is that it is very hard to get any output - a lot of
>> temp
>> > >> files
>> > >>> written but not always would get final files created.
>> > >>>
>> > >>> So, I am playing with triggers etc... If I do following
>> > >>>
>> > >>> PCollection<String> streamData = p.apply(
>> > >>>         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME
>> +
>> > >>> "/topics/myTopic"));
>> > >>>
>> > >>>
>> > >>>
>> > >>
>> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio
>> n.standardSeconds(5)))
>> > >>>
>> > >>>
>> > >>
>> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
>> > >>>             .withAllowedLateness(Duration.ZERO)
>> > >>>             .discardingFiredPanes())
>> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
>> > >>>     .withSuffix(".suff").withNumShards(10));
>> > >>>
>> > >>>     p.run();
>> > >>>
>> > >>> I would expect to see some files in /tmp/ with final results..
>> unless I
>> > >> add
>> > >>> good triggers I usually do not get any data.. only temp files in
>> > >>> /temp/.beam/
>> > >>>
>> > >>> but sometimes when data should be written I get following exception
>> > >>>
>> > >>> Exception in thread "main"
>> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> > >>> java.lang.UnsupportedOperationException: There is no default
>> policy for
>> > >>> windowed file output. Please provide an explicit FilenamePolicy to
>> > >> generate
>> > >>> filenames.
>> > >>> at
>> > >>>
>> > >>
>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>> sult.waitUntilFinish(DirectRunner.java:322)
>> > >>> at
>> > >>>
>> > >>
>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>> sult.waitUntilFinish(DirectRunner.java:292)
>> > >>> at
>> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
>> > >>> at
>> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
>> > >>> at Test.main(Test.java:50)
>> > >>>
>> > >>>
>> > >>> Would it make sense to change TextIO so that it does not
>> > >>> use DefaultFilenamePolicy only - but in case there are
>> windowedWrites
>> > and
>> > >>> no filename policy was specified by user it could actually use
>> custom
>> > >>> FilePerWindow policy automatically. I believe today TextIO always
>> > expects
>> > >>> user to specify FilenamePolicy, right?
>> > >>>
>> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I
>> > believe
>> > >>> today there are only implementations in tests and examples but
>> nothing
>> > >>> publicly visible, right?
>> > >>>
>> > >>>
>> > >>>
>> > >>> thanks
>> > >>>
>> > >>
>> > >> --
>> > >> Jean-Baptiste Onofré
>> > >> jbonofre@apache.org
>> > >> http://blog.nanthrax.net
>> > >> Talend - http://www.talend.com
>> > >>
>> > >
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > jbonofre@apache.org
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>> >
>>
>
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Dan Halperin <dh...@google.com.INVALID>.
+Eugene, Reuven who reviewed and implemented this code. They may have
opinions.

Note that changing the default filename policy would be
backwards-incompatible, so this would either need to go into 2.0.0 (and a
new RC3) or it would not go in.

On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <borisha.zivkovic@gmail.com
> wrote:

> great JB, thanks
>
> I do not mind working on this - let's see if anyone else has additional
> input.
>
> cheers
>
> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>
> > Got it.
> >
> > Yes, agree, I think the PerWindowFilesPolicy could be the default and let
> > the
> > user provides its own policy if he wants to.
> >
> > Regards
> > JB
> >
> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> > > Hi  JB,
> > >
> > > yes I saw that thread - I also copied your code but did not want to
> > pollute
> > > it with my proposal :)
> > >
> > > Well ok maybe default FilePerWindow policy for windowedWrites in TextIO
> > > does not make sense - not sure TBH...
> > >
> > > But would it make sense to promote a version of PerWindowFiles from
> > >
> > https://github.com/jbonofre/beam-samples/blob/master/iot/
> src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > > so that it is easier to provide some kind of PerWindowFiles filename
> > > policy..
> > >
> > >
> > > something like (where user does not have to write PerWindowFilesPolicy,
> > it
> > > comes with Beam)
> > >
> > >
> > >
> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> > > .withWindowedWrites()
> > > .withNumShards(1));
> > >
> > > not sure if this was already discussed...
> > >
> > > cheers
> > > Borisa
> > >
> > >
> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <jb...@nanthrax.net>
> > wrote:
> > >
> > >> Hi Borisa,
> > >>
> > >> You can take a look about the other thread ("Direct runner doesn't
> seem
> > to
> > >> finalize checkpoint "quickly"").
> > >>
> > >> It's basically the same point ;)
> > >>
> > >> The default trigger (event-time) doesn't fire any data. I'm
> > investigating
> > >> the
> > >> element timestamp and watermark.
> > >>
> > >> I'm also playing with that, for instance:
> > >>
> > >>
> > >>
> > https://github.com/jbonofre/beam-samples/blob/master/iot/
> src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > >>
> > >> When you use WindowedWrite, you have to provide a filename policy. We
> > could
> > >> provide a default one, but not sure it will fit fine (as it depends a
> > lot
> > >> about
> > >> the use cases).
> > >>
> > >> Regards
> > >> JB
> > >>
> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> > >>> Hi guys,
> > >>>
> > >>> just playing with reading data from PubSub and writing using TextIO.
> > >>>
> > >>> First thing is that it is very hard to get any output - a lot of temp
> > >> files
> > >>> written but not always would get final files created.
> > >>>
> > >>> So, I am playing with triggers etc... If I do following
> > >>>
> > >>> PCollection<String> streamData = p.apply(
> > >>>         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME +
> > >>> "/topics/myTopic"));
> > >>>
> > >>>
> > >>>
> > >>
> > streamData.apply(Window.<String>into(FixedWindows.of(
> Duration.standardSeconds(5)))
> > >>>
> > >>>
> > >>
> > .triggering(Repeatedly.forever(AfterProcessingTime.
> pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> > >>>             .withAllowedLateness(Duration.ZERO)
> > >>>             .discardingFiredPanes())
> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> > >>>     .withSuffix(".suff").withNumShards(10));
> > >>>
> > >>>     p.run();
> > >>>
> > >>> I would expect to see some files in /tmp/ with final results..
> unless I
> > >> add
> > >>> good triggers I usually do not get any data.. only temp files in
> > >>> /temp/.beam/
> > >>>
> > >>> but sometimes when data should be written I get following exception
> > >>>
> > >>> Exception in thread "main"
> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> > >>> java.lang.UnsupportedOperationException: There is no default policy
> for
> > >>> windowed file output. Please provide an explicit FilenamePolicy to
> > >> generate
> > >>> filenames.
> > >>> at
> > >>>
> > >>
> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> waitUntilFinish(DirectRunner.java:322)
> > >>> at
> > >>>
> > >>
> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> waitUntilFinish(DirectRunner.java:292)
> > >>> at
> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
> > >>> at
> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> > >>> at Test.main(Test.java:50)
> > >>>
> > >>>
> > >>> Would it make sense to change TextIO so that it does not
> > >>> use DefaultFilenamePolicy only - but in case there are windowedWrites
> > and
> > >>> no filename policy was specified by user it could actually use custom
> > >>> FilePerWindow policy automatically. I believe today TextIO always
> > expects
> > >>> user to specify FilenamePolicy, right?
> > >>>
> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I
> > believe
> > >>> today there are only implementations in tests and examples but
> nothing
> > >>> publicly visible, right?
> > >>>
> > >>>
> > >>>
> > >>> thanks
> > >>>
> > >>
> > >> --
> > >> Jean-Baptiste Onofré
> > >> jbonofre@apache.org
> > >> http://blog.nanthrax.net
> > >> Talend - http://www.talend.com
> > >>
> > >
> >
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Borisa Zivkovic <bo...@gmail.com>.
great JB, thanks

I do not mind working on this - let's see if anyone else has additional
input.

cheers

On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:

> Got it.
>
> Yes, agree, I think the PerWindowFilesPolicy could be the default and let
> the
> user provides its own policy if he wants to.
>
> Regards
> JB
>
> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> > Hi  JB,
> >
> > yes I saw that thread - I also copied your code but did not want to
> pollute
> > it with my proposal :)
> >
> > Well ok maybe default FilePerWindow policy for windowedWrites in TextIO
> > does not make sense - not sure TBH...
> >
> > But would it make sense to promote a version of PerWindowFiles from
> >
> https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > so that it is easier to provide some kind of PerWindowFiles filename
> > policy..
> >
> >
> > something like (where user does not have to write PerWindowFilesPolicy,
> it
> > comes with Beam)
> >
> >
> >
> > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> > .withWindowedWrites()
> > .withNumShards(1));
> >
> > not sure if this was already discussed...
> >
> > cheers
> > Borisa
> >
> >
> > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
> >
> >> Hi Borisa,
> >>
> >> You can take a look about the other thread ("Direct runner doesn't seem
> to
> >> finalize checkpoint "quickly"").
> >>
> >> It's basically the same point ;)
> >>
> >> The default trigger (event-time) doesn't fire any data. I'm
> investigating
> >> the
> >> element timestamp and watermark.
> >>
> >> I'm also playing with that, for instance:
> >>
> >>
> >>
> https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>
> >> When you use WindowedWrite, you have to provide a filename policy. We
> could
> >> provide a default one, but not sure it will fit fine (as it depends a
> lot
> >> about
> >> the use cases).
> >>
> >> Regards
> >> JB
> >>
> >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> >>> Hi guys,
> >>>
> >>> just playing with reading data from PubSub and writing using TextIO.
> >>>
> >>> First thing is that it is very hard to get any output - a lot of temp
> >> files
> >>> written but not always would get final files created.
> >>>
> >>> So, I am playing with triggers etc... If I do following
> >>>
> >>> PCollection<String> streamData = p.apply(
> >>>         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME +
> >>> "/topics/myTopic"));
> >>>
> >>>
> >>>
> >>
> streamData.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
> >>>
> >>>
> >>
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> >>>             .withAllowedLateness(Duration.ZERO)
> >>>             .discardingFiredPanes())
> >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> >>>     .withSuffix(".suff").withNumShards(10));
> >>>
> >>>     p.run();
> >>>
> >>> I would expect to see some files in /tmp/ with final results.. unless I
> >> add
> >>> good triggers I usually do not get any data.. only temp files in
> >>> /temp/.beam/
> >>>
> >>> but sometimes when data should be written I get following exception
> >>>
> >>> Exception in thread "main"
> >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >>> java.lang.UnsupportedOperationException: There is no default policy for
> >>> windowed file output. Please provide an explicit FilenamePolicy to
> >> generate
> >>> filenames.
> >>> at
> >>>
> >>
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
> >>> at
> >>>
> >>
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
> >>> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
> >>> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
> >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> >>> at Test.main(Test.java:50)
> >>>
> >>>
> >>> Would it make sense to change TextIO so that it does not
> >>> use DefaultFilenamePolicy only - but in case there are windowedWrites
> and
> >>> no filename policy was specified by user it could actually use custom
> >>> FilePerWindow policy automatically. I believe today TextIO always
> expects
> >>> user to specify FilenamePolicy, right?
> >>>
> >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I
> believe
> >>> today there are only implementations in tests and examples but nothing
> >>> publicly visible, right?
> >>>
> >>>
> >>>
> >>> thanks
> >>>
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbonofre@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >>
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Got it.

Yes, agree, I think the PerWindowFilesPolicy could be the default and let the 
user provides its own policy if he wants to.

Regards
JB

On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> Hi  JB,
>
> yes I saw that thread - I also copied your code but did not want to pollute
> it with my proposal :)
>
> Well ok maybe default FilePerWindow policy for windowedWrites in TextIO
> does not make sense - not sure TBH...
>
> But would it make sense to promote a version of PerWindowFiles from
> https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> so that it is easier to provide some kind of PerWindowFiles filename
> policy..
>
>
> something like (where user does not have to write PerWindowFilesPolicy, it
> comes with Beam)
>
>
>
> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> .withWindowedWrites()
> .withNumShards(1));
>
> not sure if this was already discussed...
>
> cheers
> Borisa
>
>
> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>
>> Hi Borisa,
>>
>> You can take a look about the other thread ("Direct runner doesn't seem to
>> finalize checkpoint "quickly"").
>>
>> It's basically the same point ;)
>>
>> The default trigger (event-time) doesn't fire any data. I'm investigating
>> the
>> element timestamp and watermark.
>>
>> I'm also playing with that, for instance:
>>
>>
>> https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>
>> When you use WindowedWrite, you have to provide a filename policy. We could
>> provide a default one, but not sure it will fit fine (as it depends a lot
>> about
>> the use cases).
>>
>> Regards
>> JB
>>
>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
>>> Hi guys,
>>>
>>> just playing with reading data from PubSub and writing using TextIO.
>>>
>>> First thing is that it is very hard to get any output - a lot of temp
>> files
>>> written but not always would get final files created.
>>>
>>> So, I am playing with triggers etc... If I do following
>>>
>>> PCollection<String> streamData = p.apply(
>>>         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME +
>>> "/topics/myTopic"));
>>>
>>>
>>>
>> streamData.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
>>>
>>>
>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3))))
>>>             .withAllowedLateness(Duration.ZERO)
>>>             .discardingFiredPanes())
>>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
>>>     .withSuffix(".suff").withNumShards(10));
>>>
>>>     p.run();
>>>
>>> I would expect to see some files in /tmp/ with final results.. unless I
>> add
>>> good triggers I usually do not get any data.. only temp files in
>>> /temp/.beam/
>>>
>>> but sometimes when data should be written I get following exception
>>>
>>> Exception in thread "main"
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> java.lang.UnsupportedOperationException: There is no default policy for
>>> windowed file output. Please provide an explicit FilenamePolicy to
>> generate
>>> filenames.
>>> at
>>>
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
>>> at
>>>
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
>>> at Test.main(Test.java:50)
>>>
>>>
>>> Would it make sense to change TextIO so that it does not
>>> use DefaultFilenamePolicy only - but in case there are windowedWrites and
>>> no filename policy was specified by user it could actually use custom
>>> FilePerWindow policy automatically. I believe today TextIO always expects
>>> user to specify FilenamePolicy, right?
>>>
>>> Or maybe to have FilePerWindow policy exposed as part of Beam - I believe
>>> today there are only implementations in tests and examples but nothing
>>> publicly visible, right?
>>>
>>>
>>>
>>> thanks
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Borisa Zivkovic <bo...@gmail.com>.
Hi  JB,

yes I saw that thread - I also copied your code but did not want to pollute
it with my proposal :)

Well ok maybe default FilePerWindow policy for windowedWrites in TextIO
does not make sense - not sure TBH...

But would it make sense to promote a version of PerWindowFiles from
https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
so that it is easier to provide some kind of PerWindowFiles filename
policy..


something like (where user does not have to write PerWindowFilesPolicy, it
comes with Beam)



.withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
.withWindowedWrites()
.withNumShards(1));

not sure if this was already discussed...

cheers
Borisa


On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:

> Hi Borisa,
>
> You can take a look about the other thread ("Direct runner doesn't seem to
> finalize checkpoint "quickly"").
>
> It's basically the same point ;)
>
> The default trigger (event-time) doesn't fire any data. I'm investigating
> the
> element timestamp and watermark.
>
> I'm also playing with that, for instance:
>
>
> https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>
> When you use WindowedWrite, you have to provide a filename policy. We could
> provide a default one, but not sure it will fit fine (as it depends a lot
> about
> the use cases).
>
> Regards
> JB
>
> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> > Hi guys,
> >
> > just playing with reading data from PubSub and writing using TextIO.
> >
> > First thing is that it is very hard to get any output - a lot of temp
> files
> > written but not always would get final files created.
> >
> > So, I am playing with triggers etc... If I do following
> >
> > PCollection<String> streamData = p.apply(
> >         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME +
> > "/topics/myTopic"));
> >
> >
> >
> streamData.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
> >
> >
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> >             .withAllowedLateness(Duration.ZERO)
> >             .discardingFiredPanes())
> >     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> >     .withSuffix(".suff").withNumShards(10));
> >
> >     p.run();
> >
> > I would expect to see some files in /tmp/ with final results.. unless I
> add
> > good triggers I usually do not get any data.. only temp files in
> > /temp/.beam/
> >
> > but sometimes when data should be written I get following exception
> >
> > Exception in thread "main"
> > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> > java.lang.UnsupportedOperationException: There is no default policy for
> > windowed file output. Please provide an explicit FilenamePolicy to
> generate
> > filenames.
> > at
> >
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
> > at
> >
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
> > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
> > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> > at Test.main(Test.java:50)
> >
> >
> > Would it make sense to change TextIO so that it does not
> > use DefaultFilenamePolicy only - but in case there are windowedWrites and
> > no filename policy was specified by user it could actually use custom
> > FilePerWindow policy automatically. I believe today TextIO always expects
> > user to specify FilenamePolicy, right?
> >
> > Or maybe to have FilePerWindow policy exposed as part of Beam - I believe
> > today there are only implementations in tests and examples but nothing
> > publicly visible, right?
> >
> >
> >
> > thanks
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: TextIO and .withWindowedWrites() - filenamepolicy

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Borisa,

You can take a look about the other thread ("Direct runner doesn't seem to 
finalize checkpoint "quickly"").

It's basically the same point ;)

The default trigger (event-time) doesn't fire any data. I'm investigating the 
element timestamp and watermark.

I'm also playing with that, for instance:

https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java

When you use WindowedWrite, you have to provide a filename policy. We could 
provide a default one, but not sure it will fit fine (as it depends a lot about 
the use cases).

Regards
JB

On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> Hi guys,
>
> just playing with reading data from PubSub and writing using TextIO.
>
> First thing is that it is very hard to get any output - a lot of temp files
> written but not always would get final files created.
>
> So, I am playing with triggers etc... If I do following
>
> PCollection<String> streamData = p.apply(
>         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME +
> "/topics/myTopic"));
>
>
> streamData.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
>
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3))))
>             .withAllowedLateness(Duration.ZERO)
>             .discardingFiredPanes())
>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
>     .withSuffix(".suff").withNumShards(10));
>
>     p.run();
>
> I would expect to see some files in /tmp/ with final results.. unless I add
> good triggers I usually do not get any data.. only temp files in
> /temp/.beam/
>
> but sometimes when data should be written I get following exception
>
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.UnsupportedOperationException: There is no default policy for
> windowed file output. Please provide an explicit FilenamePolicy to generate
> filenames.
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> at Test.main(Test.java:50)
>
>
> Would it make sense to change TextIO so that it does not
> use DefaultFilenamePolicy only - but in case there are windowedWrites and
> no filename policy was specified by user it could actually use custom
> FilePerWindow policy automatically. I believe today TextIO always expects
> user to specify FilenamePolicy, right?
>
> Or maybe to have FilePerWindow policy exposed as part of Beam - I believe
> today there are only implementations in tests and examples but nothing
> publicly visible, right?
>
>
>
> thanks
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com