You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Eugene Kirpichov <ki...@google.com.INVALID> on 2017/09/01 01:32:07 UTC

Re: Proposal: file-based IOs should support readAllMatches()

I sent a PR about this all:
https://github.com/apache/beam/pull/3799

On Mon, Aug 28, 2017 at 8:45 AM Eugene Kirpichov <ki...@google.com>
wrote:

> Thanks. I think I agree that file-based IOs (at least widely used ones)
> should, for convenience, still provide FooIO.read().from(filepattern), and
> for performance until SDF has full support in all runners, implement it via
> a BoundedSource.
>
> The second case with Create.of(filepattern) illustrates when the
> filepattern is not known at construction time but rather there's a
> collection of filepatterns: it's a separate use case.
>
> On Mon, Aug 28, 2017 at 2:23 AM Etienne Chauchot <ec...@gmail.com>
> wrote:
>
>> Hi Eugene,
>>
>> +1 to this, it is nice to add this common behavior to all the file-based
>> IOs. I find the design elegant, I just have one minor API comment, I
>> would prefer
>>
>> p.apply(FooIO.read().from(filepattern))
>>
>> to
>>
>> p.apply(Create.of(filepattern))
>>
>> IMHO, it is more readable and analogous to the other APIs.
>>
>> Etienne
>>
>> Le 18/08/2017 à 23:38, Eugene Kirpichov a écrit :
>> > Hi all,
>> >
>> > I've been adding new features to TextIO and AvroIO recently, see e.g.
>> > https://github.com/apache/beam/pull/3725. The features are:
>> > - withHintMatchesManyFiles()
>> > - readAll() that reads a PCollection of filepatterns
>> > - configurable treatment of filepatterns that match no files
>> > - watchForNewFiles() that incrementally watches for new files matching
>> the
>> > filepatterns
>> >
>> > However, these features also make sense for other file-based IOs
>> > (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them
>> explicitly
>> > to each of these requires a lot of boilerplate and reeks of lack of
>> > modularity. I don't want to add this much duplicated code to each of
>> them,
>> > nor to require authors of new such IOs to add it.
>> >
>> > Note that all of these features are available on the recently added
>> > Match.filepatterns() transform, that converts a PCollection<String> to a
>> > PCollection<MatchResult.Metadata> (file path and size). The boilerplate
>> in
>> > file-based IOs ends up simply passing on the properties to the Match
>> > transform.
>> >
>> > Because of this, I'd like to propose the following recommendation for
>> > file-based IOs:
>> > A file-based FooIO should include:
>> > - A read transform that reads the data from a filepattern specified at
>> > pipeline construction time - FooIO.read().from(filepattern) or something
>> > analogous, as a PCollection<Foo>
>> > - A transform FooIO.readAllMatches() that converts a
>> PCollection<Metadata>
>> > to PCollection<Foo>
>> >
>> > Then FooIO.read() handles the common case, and the user can solve all
>> > advanced cases by combining Match.filepatterns() with
>> > FooIO.readAllMatches():
>> >
>> > // Read files in a filepattern but don't fail if it's empty
>> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
>> >     .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
>> >     .apply(FooIO.readAllMatches());
>> >
>> > // Read new filepatterns arriving over PubSub, and for each filepattern
>> > // continuously watch for new files matching it, polling every 1 minute
>> > // and stop polling a filepattern if no new files appear for 5 minutes
>> > PCollection<String> filepatterns = p.apply(PubsubIO.readStrings()...)
>> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
>> >     .apply(Match.filepatterns().continuously(
>> >        Duration.standardMinutes(1),
>> > afterTimeSinceNewOutput(Duration.standardMinutes(5))))
>> >     .apply(FooIO.readAllMatches());
>> >
>> > Adding explicit support for these configuration options to FooIO.read(),
>> > and adding a FooIO.readAll() should be optional.
>> >
>> > WDYT?
>> >
>>
>>

Re: Proposal: file-based IOs should support readAllMatches()

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
The PR is in.

Now you can write code like the following, to use XmlIO to watch for new
files even though XmlIO itself does not support this.

   PCollection<ReadableFile> files = p

 .apply(FileIO.match().filepattern(options.getInputFilepatternProvider()).continuously(
         Duration.standardSeconds(30),
afterTimeSinceNewOutput(Duration.standardMinutes(5))))
       .apply(FileIO.readMatches().withCompression(GZIP));

   PCollection<String> output = files.apply(XmlIO.<Record>readFiles()
       .withRootElement("root")
       .withRecordElement("record")
       .withRecordClass(Record.class));

Or, you can write new file-based IOs by implementing only a readMatches()
version - you'll get the following features for free, via FileIO.match()
and FileIO.read():
- Watching for new files
- Handling compressed files
- Customizable handling of filepatterns that match no files or filepatterns
that match a directory

Or, you can write code to do arbitrary things with collections of files -
e.g. now it's quite trivial for a user to implement reading a collection of
text files with filenames and line numbers, by simply mapping the result of
FileIO.read() with a ParDo that parses the ReadableFile in this way.

On Thu, Aug 31, 2017 at 6:32 PM Eugene Kirpichov <ki...@google.com>
wrote:

> I sent a PR about this all:
> https://github.com/apache/beam/pull/3799
>
> On Mon, Aug 28, 2017 at 8:45 AM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Thanks. I think I agree that file-based IOs (at least widely used ones)
>> should, for convenience, still provide FooIO.read().from(filepattern), and
>> for performance until SDF has full support in all runners, implement it via
>> a BoundedSource.
>>
>> The second case with Create.of(filepattern) illustrates when the
>> filepattern is not known at construction time but rather there's a
>> collection of filepatterns: it's a separate use case.
>>
>> On Mon, Aug 28, 2017 at 2:23 AM Etienne Chauchot <ec...@gmail.com>
>> wrote:
>>
>>> Hi Eugene,
>>>
>>> +1 to this, it is nice to add this common behavior to all the file-based
>>> IOs. I find the design elegant, I just have one minor API comment, I
>>> would prefer
>>>
>>> p.apply(FooIO.read().from(filepattern))
>>>
>>> to
>>>
>>> p.apply(Create.of(filepattern))
>>>
>>> IMHO, it is more readable and analogous to the other APIs.
>>>
>>> Etienne
>>>
>>> Le 18/08/2017 à 23:38, Eugene Kirpichov a écrit :
>>> > Hi all,
>>> >
>>> > I've been adding new features to TextIO and AvroIO recently, see e.g.
>>> > https://github.com/apache/beam/pull/3725. The features are:
>>> > - withHintMatchesManyFiles()
>>> > - readAll() that reads a PCollection of filepatterns
>>> > - configurable treatment of filepatterns that match no files
>>> > - watchForNewFiles() that incrementally watches for new files matching
>>> the
>>> > filepatterns
>>> >
>>> > However, these features also make sense for other file-based IOs
>>> > (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them
>>> explicitly
>>> > to each of these requires a lot of boilerplate and reeks of lack of
>>> > modularity. I don't want to add this much duplicated code to each of
>>> them,
>>> > nor to require authors of new such IOs to add it.
>>> >
>>> > Note that all of these features are available on the recently added
>>> > Match.filepatterns() transform, that converts a PCollection<String> to
>>> a
>>> > PCollection<MatchResult.Metadata> (file path and size). The
>>> boilerplate in
>>> > file-based IOs ends up simply passing on the properties to the Match
>>> > transform.
>>> >
>>> > Because of this, I'd like to propose the following recommendation for
>>> > file-based IOs:
>>> > A file-based FooIO should include:
>>> > - A read transform that reads the data from a filepattern specified at
>>> > pipeline construction time - FooIO.read().from(filepattern) or
>>> something
>>> > analogous, as a PCollection<Foo>
>>> > - A transform FooIO.readAllMatches() that converts a
>>> PCollection<Metadata>
>>> > to PCollection<Foo>
>>> >
>>> > Then FooIO.read() handles the common case, and the user can solve all
>>> > advanced cases by combining Match.filepatterns() with
>>> > FooIO.readAllMatches():
>>> >
>>> > // Read files in a filepattern but don't fail if it's empty
>>> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
>>> >     .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
>>> >     .apply(FooIO.readAllMatches());
>>> >
>>> > // Read new filepatterns arriving over PubSub, and for each filepattern
>>> > // continuously watch for new files matching it, polling every 1 minute
>>> > // and stop polling a filepattern if no new files appear for 5 minutes
>>> > PCollection<String> filepatterns = p.apply(PubsubIO.readStrings()...)
>>> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
>>> >     .apply(Match.filepatterns().continuously(
>>> >        Duration.standardMinutes(1),
>>> > afterTimeSinceNewOutput(Duration.standardMinutes(5))))
>>> >     .apply(FooIO.readAllMatches());
>>> >
>>> > Adding explicit support for these configuration options to
>>> FooIO.read(),
>>> > and adding a FooIO.readAll() should be optional.
>>> >
>>> > WDYT?
>>> >
>>>
>>>