You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Reuven Lax <re...@google.com.INVALID> on 2017/05/19 05:31:28 UTC

Dynamic file-based sinks

While Beam now supports file-based sinks that can depend on the current
window, I've seen interest in value-dependent sinks as well (and there's a
long-standing JIRA for this). I wrote up a short API proposal for this for
discussion on the list.

https://docs.google.com/document/d/1Bd9mJO1YC8vOoFObJFupVURBMCl7jWt6hOgw6ClwxE4/edit?usp=sharing

Reuven

Re: Dynamic file-based sinks

Posted by Josh <jo...@gmail.com>.
That's great news, thanks Reuven! I will try this out soon.

On Sat, Jul 29, 2017 at 2:33 AM, Reuven Lax <re...@google.com.invalid>
wrote:

> The AvroIO PR is now merged, so you can write to different destinations
> based on the value. It's available in head, and will be in Beam 2.2.0.
>
> On Wed, Jul 26, 2017 at 10:00 AM, Reuven Lax <re...@google.com> wrote:
>
> > Yes, there was! TextIO support is already merged into Beam (it missed the
> > 2.1 cutoff, so it will be in Beam 2.2.0). AvroIO support is in
> > https://github.com/apache/beam/pull/3541. This is almost ready to merge
> -
> > still waiting for final review from kennknowles on the Beam translation
> > changes.
> >
> > Nobody is working on BigtableIO yet, however the framework used in
> > BigQueryIO, TextIO, and AvroIO should be easy to generalize to other
> sinks.
> >
> > On Wed, Jul 26, 2017 at 7:41 AM, Josh <jo...@gmail.com> wrote:
> >
> >> Hi all,
> >>
> >> Was there any progress on this recently? I am particularly interested in
> >> using value-dependent destinations in BigtableIO (writing to a specific
> >> table depending on the value) and AvroIO (writing to specific GCS
> buckets
> >> depending on the value).
> >>
> >> Thanks,
> >> Josh
> >>
> >> On Fri, Jun 9, 2017 at 5:35 PM, Reuven Lax <re...@google.com.invalid>
> >> wrote:
> >>
> >> > I'm putting together a proof-of-concept PR for option 1 to see how it
> >> > looks.
> >> >
> >> > On Thu, Jun 8, 2017 at 4:07 PM, Reuven Lax <re...@google.com> wrote:
> >> >
> >> > > After looking at everyone's comments, I think option 1 is the better
> >> > > approach - map destinations to a FilenamePolicy. It is a good
> >> parallel to
> >> > > what we do in BigQueryIO (the main difference is that we're mapping
> >> to a
> >> > > sharded filename, instead of a single destination like in
> BigQueryIO).
> >> > >
> >> > > The main limitation is that numShards cannot be dynamic per
> >> destination.
> >> > I
> >> > > think that's fine for two reasons:
> >> > >
> >> > > 1. We generally discourage people from statically setting numShards,
> >> as
> >> > > often runner-determined sharding is better.
> >> > > 2. In a case where users know that certain types of output files
> need
> >> a
> >> > > different number of shards, they can always partition. e.g.
> partition
> >> > into
> >> > > a 10-shard and a 100-shard sink, with each sink writing dynamic
> files.
> >> > >
> >> > > Eugene also brought up destination directory, but that part of the
> >> > > FilenamePolicy interface is more a hint than anything else.
> >> > > DestinationDirectory is realistically just the base directory for
> the
> >> > temp
> >> > > files, and the FilenamePolicy is free to ignore it.
> >> > >
> >> > > Reuven
> >> > >
> >> > > On Wed, May 24, 2017 at 1:54 PM, Eugene Kirpichov <
> >> > > kirpichov@google.com.invalid> wrote:
> >> > >
> >> > >> Hmm, on one hand this looks syntactically very appealing, on the
> >> other
> >> > >> hand, it's icky to have a function return a PTransform at runtime,
> >> only
> >> > to
> >> > >> have some information be immediately extracted from that transform.
> >> > >> Moreover, not all TextIO.Write transforms will be legal to return -
> >> e.g.
> >> > >> most likely you're not allowed to return a transform that itself
> uses
> >> > >> dynamic destinations.
> >> > >>
> >> > >> We should think more about how to decompose this problem.
> >> > >> I think there are 2 natural elements to writing files:
> >> > >> 1) where to put the files (let's call this file location)
> >> > >> 2) how to write to a single file (let's call this file format. In
> >> case
> >> > of
> >> > >> Avro, this may theoretically include e.g. schema to be embedded in
> >> the
> >> > >> file).
> >> > >> There should be represented by different interfaces/classes in the
> >> API.
> >> > >>
> >> > >> Then:
> >> > >> - Writing a set of elements to a single file location using a
> single
> >> > file
> >> > >> format = "write operation"
> >> > >> - WriteFiles is able to route different elements to different write
> >> > >> operations, with potentially different both locations and formats.
> >> I.e.
> >> > >> it's configured by something like BQ's DynamicDestinations
> >> > >> - TextIO and AvroIO are thin wrappers over WriteFiles
> >> > >> - AvroIO in the future may be extended to support different schemas
> >> for
> >> > >> different files - then it would be even more like BigQuery: it'd
> take
> >> > also
> >> > >> a SerializableFunction<T, GenericRecord> and a
> >> > >> SerializableFunction<DestinationT, Schema>. That means that
> perhaps
> >> it
> >> > >> may
> >> > >> provide its own DynamicDestinations-like API to its users, more
> >> specific
> >> > >> than the one exposed by low-level WriteFiles.
> >> > >>
> >> > >> This is pretty vague, but I think "AvroIO with dynamic schema and
> >> with
> >> > >> (type of input PCollection = T) != (type being written =
> >> GenericRecord)"
> >> > >> is
> >> > >> a good target to guide search for the perfect API. WDYT?
> >> > >>
> >> > >> On Wed, May 24, 2017 at 11:24 AM Reuven Lax
> <relax@google.com.invalid
> >> >
> >> > >> wrote:
> >> > >>
> >> > >> > Did you see that I modified the second proposal so that users can
> >> map
> >> > >> > DestinationT to the actual PTransform (i.e. DestinationT->TextIO
> or
> >> > >> > DestinationT->AvroIO). This means that users do not have to deal
> >> with
> >> > >> > FileBasedSink or even know it exists.
> >> > >> >
> >> > >> > I prefer the second approach for two reason:
> >> > >> >
> >> > >> > 1. It allows customizing some useful things that the
> FilenamePolicy
> >> > does
> >> > >> > not. e.g. it's very reasonable to want to customize the output
> >> > directory
> >> > >> > and have a different number output shards for each directory. If
> >> the
> >> > >> > function returns a TextIO or AvroIO they can do that. If there's
> >> > simply
> >> > >> a
> >> > >> > mapping to a FilenamePolicy, the can't do that.
> >> > >> >
> >> > >> > 2. The majority of users don't need to deal with
> >> DefaultFilenamePolicy
> >> > >> > today. Allowing them to use the TextIO etc. builders for this
> will
> >> be
> >> > >> > more-familiar than the DefaultFilenamePolicy.Config option
> >> suggested.
> >> > >> >
> >> > >> > On Wed, May 24, 2017 at 10:59 AM, Kenneth Knowles
> >> > >> <kl...@google.com.invalid>
> >> > >> > wrote:
> >> > >> >
> >> > >> > > I commented a little in the doc I want to reply on list because
> >> this
> >> > >> is a
> >> > >> > > really great feature.
> >> > >> > >
> >> > >> > > The two alternatives, as I understand them, both include
> mapping
> >> > your
> >> > >> > > elements to an intermediate DestinationT that you can group by
> >> > before
> >> > >> > > writing. Then the big picture decision is whether to map each
> >> > >> > DestinationT
> >> > >> > > to a different FilenamePolicy (which may need to be made more
> >> > >> powerful)
> >> > >> > or
> >> > >> > > map each DestinationT to a different FileBasedSink.
> >> > >> > >
> >> > >> > > I think both are reasonable, modulo pitfalls that I'm probably
> >> > >> glossing
> >> > >> > > over. I favor the FilenamePolicy version a bit, because it is
> >> > focused
> >> > >> > just
> >> > >> > > on the file names, whereas the FileBasedSink version seems a
> bit
> >> > >> > > overpowered for the use case. The other consideration is that
> >> > >> > > FilenamePolicy is intended for user consumption, while
> >> FileBasedSink
> >> > >> is
> >> > >> > not
> >> > >> > > so much.
> >> > >> > >
> >> > >> > > Kenn
> >> > >> > >
> >> > >> > > On Thu, May 18, 2017 at 10:31 PM, Reuven Lax
> >> > <relax@google.com.invalid
> >> > >> >
> >> > >> > > wrote:
> >> > >> > >
> >> > >> > > > While Beam now supports file-based sinks that can depend on
> the
> >> > >> current
> >> > >> > > > window, I've seen interest in value-dependent sinks as well
> >> (and
> >> > >> > there's
> >> > >> > > a
> >> > >> > > > long-standing JIRA for this). I wrote up a short API proposal
> >> for
> >> > >> this
> >> > >> > > for
> >> > >> > > > discussion on the list.
> >> > >> > > >
> >> > >> > > > https://docs.google.com/document/d/1Bd9mJO1YC8vOoFObJFupVURB
> >> MCl7j
> >> > >> > > > Wt6hOgw6ClwxE4/edit?usp=sharing
> >> > >> > > >
> >> > >> > > > Reuven
> >> > >> > > >
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> >
> >>
> >
> >
>

Re: Dynamic file-based sinks

Posted by Reuven Lax <re...@google.com.INVALID>.
The AvroIO PR is now merged, so you can write to different destinations
based on the value. It's available in head, and will be in Beam 2.2.0.

On Wed, Jul 26, 2017 at 10:00 AM, Reuven Lax <re...@google.com> wrote:

> Yes, there was! TextIO support is already merged into Beam (it missed the
> 2.1 cutoff, so it will be in Beam 2.2.0). AvroIO support is in
> https://github.com/apache/beam/pull/3541. This is almost ready to merge -
> still waiting for final review from kennknowles on the Beam translation
> changes.
>
> Nobody is working on BigtableIO yet, however the framework used in
> BigQueryIO, TextIO, and AvroIO should be easy to generalize to other sinks.
>
> On Wed, Jul 26, 2017 at 7:41 AM, Josh <jo...@gmail.com> wrote:
>
>> Hi all,
>>
>> Was there any progress on this recently? I am particularly interested in
>> using value-dependent destinations in BigtableIO (writing to a specific
>> table depending on the value) and AvroIO (writing to specific GCS buckets
>> depending on the value).
>>
>> Thanks,
>> Josh
>>
>> On Fri, Jun 9, 2017 at 5:35 PM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>>
>> > I'm putting together a proof-of-concept PR for option 1 to see how it
>> > looks.
>> >
>> > On Thu, Jun 8, 2017 at 4:07 PM, Reuven Lax <re...@google.com> wrote:
>> >
>> > > After looking at everyone's comments, I think option 1 is the better
>> > > approach - map destinations to a FilenamePolicy. It is a good
>> parallel to
>> > > what we do in BigQueryIO (the main difference is that we're mapping
>> to a
>> > > sharded filename, instead of a single destination like in BigQueryIO).
>> > >
>> > > The main limitation is that numShards cannot be dynamic per
>> destination.
>> > I
>> > > think that's fine for two reasons:
>> > >
>> > > 1. We generally discourage people from statically setting numShards,
>> as
>> > > often runner-determined sharding is better.
>> > > 2. In a case where users know that certain types of output files need
>> a
>> > > different number of shards, they can always partition. e.g. partition
>> > into
>> > > a 10-shard and a 100-shard sink, with each sink writing dynamic files.
>> > >
>> > > Eugene also brought up destination directory, but that part of the
>> > > FilenamePolicy interface is more a hint than anything else.
>> > > DestinationDirectory is realistically just the base directory for the
>> > temp
>> > > files, and the FilenamePolicy is free to ignore it.
>> > >
>> > > Reuven
>> > >
>> > > On Wed, May 24, 2017 at 1:54 PM, Eugene Kirpichov <
>> > > kirpichov@google.com.invalid> wrote:
>> > >
>> > >> Hmm, on one hand this looks syntactically very appealing, on the
>> other
>> > >> hand, it's icky to have a function return a PTransform at runtime,
>> only
>> > to
>> > >> have some information be immediately extracted from that transform.
>> > >> Moreover, not all TextIO.Write transforms will be legal to return -
>> e.g.
>> > >> most likely you're not allowed to return a transform that itself uses
>> > >> dynamic destinations.
>> > >>
>> > >> We should think more about how to decompose this problem.
>> > >> I think there are 2 natural elements to writing files:
>> > >> 1) where to put the files (let's call this file location)
>> > >> 2) how to write to a single file (let's call this file format. In
>> case
>> > of
>> > >> Avro, this may theoretically include e.g. schema to be embedded in
>> the
>> > >> file).
>> > >> There should be represented by different interfaces/classes in the
>> API.
>> > >>
>> > >> Then:
>> > >> - Writing a set of elements to a single file location using a single
>> > file
>> > >> format = "write operation"
>> > >> - WriteFiles is able to route different elements to different write
>> > >> operations, with potentially different both locations and formats.
>> I.e.
>> > >> it's configured by something like BQ's DynamicDestinations
>> > >> - TextIO and AvroIO are thin wrappers over WriteFiles
>> > >> - AvroIO in the future may be extended to support different schemas
>> for
>> > >> different files - then it would be even more like BigQuery: it'd take
>> > also
>> > >> a SerializableFunction<T, GenericRecord> and a
>> > >> SerializableFunction<DestinationT, Schema>. That means that perhaps
>> it
>> > >> may
>> > >> provide its own DynamicDestinations-like API to its users, more
>> specific
>> > >> than the one exposed by low-level WriteFiles.
>> > >>
>> > >> This is pretty vague, but I think "AvroIO with dynamic schema and
>> with
>> > >> (type of input PCollection = T) != (type being written =
>> GenericRecord)"
>> > >> is
>> > >> a good target to guide search for the perfect API. WDYT?
>> > >>
>> > >> On Wed, May 24, 2017 at 11:24 AM Reuven Lax <relax@google.com.invalid
>> >
>> > >> wrote:
>> > >>
>> > >> > Did you see that I modified the second proposal so that users can
>> map
>> > >> > DestinationT to the actual PTransform (i.e. DestinationT->TextIO or
>> > >> > DestinationT->AvroIO). This means that users do not have to deal
>> with
>> > >> > FileBasedSink or even know it exists.
>> > >> >
>> > >> > I prefer the second approach for two reason:
>> > >> >
>> > >> > 1. It allows customizing some useful things that the FilenamePolicy
>> > does
>> > >> > not. e.g. it's very reasonable to want to customize the output
>> > directory
>> > >> > and have a different number output shards for each directory. If
>> the
>> > >> > function returns a TextIO or AvroIO they can do that. If there's
>> > simply
>> > >> a
>> > >> > mapping to a FilenamePolicy, the can't do that.
>> > >> >
>> > >> > 2. The majority of users don't need to deal with
>> DefaultFilenamePolicy
>> > >> > today. Allowing them to use the TextIO etc. builders for this will
>> be
>> > >> > more-familiar than the DefaultFilenamePolicy.Config option
>> suggested.
>> > >> >
>> > >> > On Wed, May 24, 2017 at 10:59 AM, Kenneth Knowles
>> > >> <kl...@google.com.invalid>
>> > >> > wrote:
>> > >> >
>> > >> > > I commented a little in the doc I want to reply on list because
>> this
>> > >> is a
>> > >> > > really great feature.
>> > >> > >
>> > >> > > The two alternatives, as I understand them, both include mapping
>> > your
>> > >> > > elements to an intermediate DestinationT that you can group by
>> > before
>> > >> > > writing. Then the big picture decision is whether to map each
>> > >> > DestinationT
>> > >> > > to a different FilenamePolicy (which may need to be made more
>> > >> powerful)
>> > >> > or
>> > >> > > map each DestinationT to a different FileBasedSink.
>> > >> > >
>> > >> > > I think both are reasonable, modulo pitfalls that I'm probably
>> > >> glossing
>> > >> > > over. I favor the FilenamePolicy version a bit, because it is
>> > focused
>> > >> > just
>> > >> > > on the file names, whereas the FileBasedSink version seems a bit
>> > >> > > overpowered for the use case. The other consideration is that
>> > >> > > FilenamePolicy is intended for user consumption, while
>> FileBasedSink
>> > >> is
>> > >> > not
>> > >> > > so much.
>> > >> > >
>> > >> > > Kenn
>> > >> > >
>> > >> > > On Thu, May 18, 2017 at 10:31 PM, Reuven Lax
>> > <relax@google.com.invalid
>> > >> >
>> > >> > > wrote:
>> > >> > >
>> > >> > > > While Beam now supports file-based sinks that can depend on the
>> > >> current
>> > >> > > > window, I've seen interest in value-dependent sinks as well
>> (and
>> > >> > there's
>> > >> > > a
>> > >> > > > long-standing JIRA for this). I wrote up a short API proposal
>> for
>> > >> this
>> > >> > > for
>> > >> > > > discussion on the list.
>> > >> > > >
>> > >> > > > https://docs.google.com/document/d/1Bd9mJO1YC8vOoFObJFupVURB
>> MCl7j
>> > >> > > > Wt6hOgw6ClwxE4/edit?usp=sharing
>> > >> > > >
>> > >> > > > Reuven
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Re: Dynamic file-based sinks

Posted by Reuven Lax <re...@google.com.INVALID>.
Yes, there was! TextIO support is already merged into Beam (it missed the
2.1 cutoff, so it will be in Beam 2.2.0). AvroIO support is in
https://github.com/apache/beam/pull/3541. This is almost ready to merge -
still waiting for final review from kennknowles on the Beam translation
changes.

Nobody is working on BigtableIO yet, however the framework used in
BigQueryIO, TextIO, and AvroIO should be easy to generalize to other sinks.

On Wed, Jul 26, 2017 at 7:41 AM, Josh <jo...@gmail.com> wrote:

> Hi all,
>
> Was there any progress on this recently? I am particularly interested in
> using value-dependent destinations in BigtableIO (writing to a specific
> table depending on the value) and AvroIO (writing to specific GCS buckets
> depending on the value).
>
> Thanks,
> Josh
>
> On Fri, Jun 9, 2017 at 5:35 PM, Reuven Lax <re...@google.com.invalid>
> wrote:
>
> > I'm putting together a proof-of-concept PR for option 1 to see how it
> > looks.
> >
> > On Thu, Jun 8, 2017 at 4:07 PM, Reuven Lax <re...@google.com> wrote:
> >
> > > After looking at everyone's comments, I think option 1 is the better
> > > approach - map destinations to a FilenamePolicy. It is a good parallel
> to
> > > what we do in BigQueryIO (the main difference is that we're mapping to
> a
> > > sharded filename, instead of a single destination like in BigQueryIO).
> > >
> > > The main limitation is that numShards cannot be dynamic per
> destination.
> > I
> > > think that's fine for two reasons:
> > >
> > > 1. We generally discourage people from statically setting numShards, as
> > > often runner-determined sharding is better.
> > > 2. In a case where users know that certain types of output files need a
> > > different number of shards, they can always partition. e.g. partition
> > into
> > > a 10-shard and a 100-shard sink, with each sink writing dynamic files.
> > >
> > > Eugene also brought up destination directory, but that part of the
> > > FilenamePolicy interface is more a hint than anything else.
> > > DestinationDirectory is realistically just the base directory for the
> > temp
> > > files, and the FilenamePolicy is free to ignore it.
> > >
> > > Reuven
> > >
> > > On Wed, May 24, 2017 at 1:54 PM, Eugene Kirpichov <
> > > kirpichov@google.com.invalid> wrote:
> > >
> > >> Hmm, on one hand this looks syntactically very appealing, on the other
> > >> hand, it's icky to have a function return a PTransform at runtime,
> only
> > to
> > >> have some information be immediately extracted from that transform.
> > >> Moreover, not all TextIO.Write transforms will be legal to return -
> e.g.
> > >> most likely you're not allowed to return a transform that itself uses
> > >> dynamic destinations.
> > >>
> > >> We should think more about how to decompose this problem.
> > >> I think there are 2 natural elements to writing files:
> > >> 1) where to put the files (let's call this file location)
> > >> 2) how to write to a single file (let's call this file format. In case
> > of
> > >> Avro, this may theoretically include e.g. schema to be embedded in the
> > >> file).
> > >> There should be represented by different interfaces/classes in the
> API.
> > >>
> > >> Then:
> > >> - Writing a set of elements to a single file location using a single
> > file
> > >> format = "write operation"
> > >> - WriteFiles is able to route different elements to different write
> > >> operations, with potentially different both locations and formats.
> I.e.
> > >> it's configured by something like BQ's DynamicDestinations
> > >> - TextIO and AvroIO are thin wrappers over WriteFiles
> > >> - AvroIO in the future may be extended to support different schemas
> for
> > >> different files - then it would be even more like BigQuery: it'd take
> > also
> > >> a SerializableFunction<T, GenericRecord> and a
> > >> SerializableFunction<DestinationT, Schema>. That means that perhaps
> it
> > >> may
> > >> provide its own DynamicDestinations-like API to its users, more
> specific
> > >> than the one exposed by low-level WriteFiles.
> > >>
> > >> This is pretty vague, but I think "AvroIO with dynamic schema and with
> > >> (type of input PCollection = T) != (type being written =
> GenericRecord)"
> > >> is
> > >> a good target to guide search for the perfect API. WDYT?
> > >>
> > >> On Wed, May 24, 2017 at 11:24 AM Reuven Lax <relax@google.com.invalid
> >
> > >> wrote:
> > >>
> > >> > Did you see that I modified the second proposal so that users can
> map
> > >> > DestinationT to the actual PTransform (i.e. DestinationT->TextIO or
> > >> > DestinationT->AvroIO). This means that users do not have to deal
> with
> > >> > FileBasedSink or even know it exists.
> > >> >
> > >> > I prefer the second approach for two reason:
> > >> >
> > >> > 1. It allows customizing some useful things that the FilenamePolicy
> > does
> > >> > not. e.g. it's very reasonable to want to customize the output
> > directory
> > >> > and have a different number output shards for each directory. If the
> > >> > function returns a TextIO or AvroIO they can do that. If there's
> > simply
> > >> a
> > >> > mapping to a FilenamePolicy, the can't do that.
> > >> >
> > >> > 2. The majority of users don't need to deal with
> DefaultFilenamePolicy
> > >> > today. Allowing them to use the TextIO etc. builders for this will
> be
> > >> > more-familiar than the DefaultFilenamePolicy.Config option
> suggested.
> > >> >
> > >> > On Wed, May 24, 2017 at 10:59 AM, Kenneth Knowles
> > >> <kl...@google.com.invalid>
> > >> > wrote:
> > >> >
> > >> > > I commented a little in the doc I want to reply on list because
> this
> > >> is a
> > >> > > really great feature.
> > >> > >
> > >> > > The two alternatives, as I understand them, both include mapping
> > your
> > >> > > elements to an intermediate DestinationT that you can group by
> > before
> > >> > > writing. Then the big picture decision is whether to map each
> > >> > DestinationT
> > >> > > to a different FilenamePolicy (which may need to be made more
> > >> powerful)
> > >> > or
> > >> > > map each DestinationT to a different FileBasedSink.
> > >> > >
> > >> > > I think both are reasonable, modulo pitfalls that I'm probably
> > >> glossing
> > >> > > over. I favor the FilenamePolicy version a bit, because it is
> > focused
> > >> > just
> > >> > > on the file names, whereas the FileBasedSink version seems a bit
> > >> > > overpowered for the use case. The other consideration is that
> > >> > > FilenamePolicy is intended for user consumption, while
> FileBasedSink
> > >> is
> > >> > not
> > >> > > so much.
> > >> > >
> > >> > > Kenn
> > >> > >
> > >> > > On Thu, May 18, 2017 at 10:31 PM, Reuven Lax
> > <relax@google.com.invalid
> > >> >
> > >> > > wrote:
> > >> > >
> > >> > > > While Beam now supports file-based sinks that can depend on the
> > >> current
> > >> > > > window, I've seen interest in value-dependent sinks as well (and
> > >> > there's
> > >> > > a
> > >> > > > long-standing JIRA for this). I wrote up a short API proposal
> for
> > >> this
> > >> > > for
> > >> > > > discussion on the list.
> > >> > > >
> > >> > > > https://docs.google.com/document/d/
> 1Bd9mJO1YC8vOoFObJFupVURBMCl7j
> > >> > > > Wt6hOgw6ClwxE4/edit?usp=sharing
> > >> > > >
> > >> > > > Reuven
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Re: Dynamic file-based sinks

Posted by Josh <jo...@gmail.com>.
Hi all,

Was there any progress on this recently? I am particularly interested in
using value-dependent destinations in BigtableIO (writing to a specific
table depending on the value) and AvroIO (writing to specific GCS buckets
depending on the value).

Thanks,
Josh

On Fri, Jun 9, 2017 at 5:35 PM, Reuven Lax <re...@google.com.invalid> wrote:

> I'm putting together a proof-of-concept PR for option 1 to see how it
> looks.
>
> On Thu, Jun 8, 2017 at 4:07 PM, Reuven Lax <re...@google.com> wrote:
>
> > After looking at everyone's comments, I think option 1 is the better
> > approach - map destinations to a FilenamePolicy. It is a good parallel to
> > what we do in BigQueryIO (the main difference is that we're mapping to a
> > sharded filename, instead of a single destination like in BigQueryIO).
> >
> > The main limitation is that numShards cannot be dynamic per destination.
> I
> > think that's fine for two reasons:
> >
> > 1. We generally discourage people from statically setting numShards, as
> > often runner-determined sharding is better.
> > 2. In a case where users know that certain types of output files need a
> > different number of shards, they can always partition. e.g. partition
> into
> > a 10-shard and a 100-shard sink, with each sink writing dynamic files.
> >
> > Eugene also brought up destination directory, but that part of the
> > FilenamePolicy interface is more a hint than anything else.
> > DestinationDirectory is realistically just the base directory for the
> temp
> > files, and the FilenamePolicy is free to ignore it.
> >
> > Reuven
> >
> > On Wed, May 24, 2017 at 1:54 PM, Eugene Kirpichov <
> > kirpichov@google.com.invalid> wrote:
> >
> >> Hmm, on one hand this looks syntactically very appealing, on the other
> >> hand, it's icky to have a function return a PTransform at runtime, only
> to
> >> have some information be immediately extracted from that transform.
> >> Moreover, not all TextIO.Write transforms will be legal to return - e.g.
> >> most likely you're not allowed to return a transform that itself uses
> >> dynamic destinations.
> >>
> >> We should think more about how to decompose this problem.
> >> I think there are 2 natural elements to writing files:
> >> 1) where to put the files (let's call this file location)
> >> 2) how to write to a single file (let's call this file format. In case
> of
> >> Avro, this may theoretically include e.g. schema to be embedded in the
> >> file).
> >> There should be represented by different interfaces/classes in the API.
> >>
> >> Then:
> >> - Writing a set of elements to a single file location using a single
> file
> >> format = "write operation"
> >> - WriteFiles is able to route different elements to different write
> >> operations, with potentially different both locations and formats. I.e.
> >> it's configured by something like BQ's DynamicDestinations
> >> - TextIO and AvroIO are thin wrappers over WriteFiles
> >> - AvroIO in the future may be extended to support different schemas for
> >> different files - then it would be even more like BigQuery: it'd take
> also
> >> a SerializableFunction<T, GenericRecord> and a
> >> SerializableFunction<DestinationT, Schema>. That means that perhaps it
> >> may
> >> provide its own DynamicDestinations-like API to its users, more specific
> >> than the one exposed by low-level WriteFiles.
> >>
> >> This is pretty vague, but I think "AvroIO with dynamic schema and with
> >> (type of input PCollection = T) != (type being written = GenericRecord)"
> >> is
> >> a good target to guide search for the perfect API. WDYT?
> >>
> >> On Wed, May 24, 2017 at 11:24 AM Reuven Lax <re...@google.com.invalid>
> >> wrote:
> >>
> >> > Did you see that I modified the second proposal so that users can map
> >> > DestinationT to the actual PTransform (i.e. DestinationT->TextIO or
> >> > DestinationT->AvroIO). This means that users do not have to deal with
> >> > FileBasedSink or even know it exists.
> >> >
> >> > I prefer the second approach for two reason:
> >> >
> >> > 1. It allows customizing some useful things that the FilenamePolicy
> does
> >> > not. e.g. it's very reasonable to want to customize the output
> directory
> >> > and have a different number output shards for each directory. If the
> >> > function returns a TextIO or AvroIO they can do that. If there's
> simply
> >> a
> >> > mapping to a FilenamePolicy, the can't do that.
> >> >
> >> > 2. The majority of users don't need to deal with DefaultFilenamePolicy
> >> > today. Allowing them to use the TextIO etc. builders for this will be
> >> > more-familiar than the DefaultFilenamePolicy.Config option suggested.
> >> >
> >> > On Wed, May 24, 2017 at 10:59 AM, Kenneth Knowles
> >> <kl...@google.com.invalid>
> >> > wrote:
> >> >
> >> > > I commented a little in the doc I want to reply on list because this
> >> is a
> >> > > really great feature.
> >> > >
> >> > > The two alternatives, as I understand them, both include mapping
> your
> >> > > elements to an intermediate DestinationT that you can group by
> before
> >> > > writing. Then the big picture decision is whether to map each
> >> > DestinationT
> >> > > to a different FilenamePolicy (which may need to be made more
> >> powerful)
> >> > or
> >> > > map each DestinationT to a different FileBasedSink.
> >> > >
> >> > > I think both are reasonable, modulo pitfalls that I'm probably
> >> glossing
> >> > > over. I favor the FilenamePolicy version a bit, because it is
> focused
> >> > just
> >> > > on the file names, whereas the FileBasedSink version seems a bit
> >> > > overpowered for the use case. The other consideration is that
> >> > > FilenamePolicy is intended for user consumption, while FileBasedSink
> >> is
> >> > not
> >> > > so much.
> >> > >
> >> > > Kenn
> >> > >
> >> > > On Thu, May 18, 2017 at 10:31 PM, Reuven Lax
> <relax@google.com.invalid
> >> >
> >> > > wrote:
> >> > >
> >> > > > While Beam now supports file-based sinks that can depend on the
> >> current
> >> > > > window, I've seen interest in value-dependent sinks as well (and
> >> > there's
> >> > > a
> >> > > > long-standing JIRA for this). I wrote up a short API proposal for
> >> this
> >> > > for
> >> > > > discussion on the list.
> >> > > >
> >> > > > https://docs.google.com/document/d/1Bd9mJO1YC8vOoFObJFupVURBMCl7j
> >> > > > Wt6hOgw6ClwxE4/edit?usp=sharing
> >> > > >
> >> > > > Reuven
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: Dynamic file-based sinks

Posted by Reuven Lax <re...@google.com.INVALID>.
I'm putting together a proof-of-concept PR for option 1 to see how it
looks.

On Thu, Jun 8, 2017 at 4:07 PM, Reuven Lax <re...@google.com> wrote:

> After looking at everyone's comments, I think option 1 is the better
> approach - map destinations to a FilenamePolicy. It is a good parallel to
> what we do in BigQueryIO (the main difference is that we're mapping to a
> sharded filename, instead of a single destination like in BigQueryIO).
>
> The main limitation is that numShards cannot be dynamic per destination. I
> think that's fine for two reasons:
>
> 1. We generally discourage people from statically setting numShards, as
> often runner-determined sharding is better.
> 2. In a case where users know that certain types of output files need a
> different number of shards, they can always partition. e.g. partition into
> a 10-shard and a 100-shard sink, with each sink writing dynamic files.
>
> Eugene also brought up destination directory, but that part of the
> FilenamePolicy interface is more a hint than anything else.
> DestinationDirectory is realistically just the base directory for the temp
> files, and the FilenamePolicy is free to ignore it.
>
> Reuven
>
> On Wed, May 24, 2017 at 1:54 PM, Eugene Kirpichov <
> kirpichov@google.com.invalid> wrote:
>
>> Hmm, on one hand this looks syntactically very appealing, on the other
>> hand, it's icky to have a function return a PTransform at runtime, only to
>> have some information be immediately extracted from that transform.
>> Moreover, not all TextIO.Write transforms will be legal to return - e.g.
>> most likely you're not allowed to return a transform that itself uses
>> dynamic destinations.
>>
>> We should think more about how to decompose this problem.
>> I think there are 2 natural elements to writing files:
>> 1) where to put the files (let's call this file location)
>> 2) how to write to a single file (let's call this file format. In case of
>> Avro, this may theoretically include e.g. schema to be embedded in the
>> file).
>> There should be represented by different interfaces/classes in the API.
>>
>> Then:
>> - Writing a set of elements to a single file location using a single file
>> format = "write operation"
>> - WriteFiles is able to route different elements to different write
>> operations, with potentially different both locations and formats. I.e.
>> it's configured by something like BQ's DynamicDestinations
>> - TextIO and AvroIO are thin wrappers over WriteFiles
>> - AvroIO in the future may be extended to support different schemas for
>> different files - then it would be even more like BigQuery: it'd take also
>> a SerializableFunction<T, GenericRecord> and a
>> SerializableFunction<DestinationT, Schema>. That means that perhaps it
>> may
>> provide its own DynamicDestinations-like API to its users, more specific
>> than the one exposed by low-level WriteFiles.
>>
>> This is pretty vague, but I think "AvroIO with dynamic schema and with
>> (type of input PCollection = T) != (type being written = GenericRecord)"
>> is
>> a good target to guide search for the perfect API. WDYT?
>>
>> On Wed, May 24, 2017 at 11:24 AM Reuven Lax <re...@google.com.invalid>
>> wrote:
>>
>> > Did you see that I modified the second proposal so that users can map
>> > DestinationT to the actual PTransform (i.e. DestinationT->TextIO or
>> > DestinationT->AvroIO). This means that users do not have to deal with
>> > FileBasedSink or even know it exists.
>> >
>> > I prefer the second approach for two reason:
>> >
>> > 1. It allows customizing some useful things that the FilenamePolicy does
>> > not. e.g. it's very reasonable to want to customize the output directory
>> > and have a different number output shards for each directory. If the
>> > function returns a TextIO or AvroIO they can do that. If there's simply
>> a
>> > mapping to a FilenamePolicy, the can't do that.
>> >
>> > 2. The majority of users don't need to deal with DefaultFilenamePolicy
>> > today. Allowing them to use the TextIO etc. builders for this will be
>> > more-familiar than the DefaultFilenamePolicy.Config option suggested.
>> >
>> > On Wed, May 24, 2017 at 10:59 AM, Kenneth Knowles
>> <kl...@google.com.invalid>
>> > wrote:
>> >
>> > > I commented a little in the doc I want to reply on list because this
>> is a
>> > > really great feature.
>> > >
>> > > The two alternatives, as I understand them, both include mapping your
>> > > elements to an intermediate DestinationT that you can group by before
>> > > writing. Then the big picture decision is whether to map each
>> > DestinationT
>> > > to a different FilenamePolicy (which may need to be made more
>> powerful)
>> > or
>> > > map each DestinationT to a different FileBasedSink.
>> > >
>> > > I think both are reasonable, modulo pitfalls that I'm probably
>> glossing
>> > > over. I favor the FilenamePolicy version a bit, because it is focused
>> > just
>> > > on the file names, whereas the FileBasedSink version seems a bit
>> > > overpowered for the use case. The other consideration is that
>> > > FilenamePolicy is intended for user consumption, while FileBasedSink
>> is
>> > not
>> > > so much.
>> > >
>> > > Kenn
>> > >
>> > > On Thu, May 18, 2017 at 10:31 PM, Reuven Lax <relax@google.com.invalid
>> >
>> > > wrote:
>> > >
>> > > > While Beam now supports file-based sinks that can depend on the
>> current
>> > > > window, I've seen interest in value-dependent sinks as well (and
>> > there's
>> > > a
>> > > > long-standing JIRA for this). I wrote up a short API proposal for
>> this
>> > > for
>> > > > discussion on the list.
>> > > >
>> > > > https://docs.google.com/document/d/1Bd9mJO1YC8vOoFObJFupVURBMCl7j
>> > > > Wt6hOgw6ClwxE4/edit?usp=sharing
>> > > >
>> > > > Reuven
>> > > >
>> > >
>> >
>>
>
>

Re: Dynamic file-based sinks

Posted by Reuven Lax <re...@google.com.INVALID>.
After looking at everyone's comments, I think option 1 is the better
approach - map destinations to a FilenamePolicy. It is a good parallel to
what we do in BigQueryIO (the main difference is that we're mapping to a
sharded filename, instead of a single destination like in BigQueryIO).

The main limitation is that numShards cannot be dynamic per destination. I
think that's fine for two reasons:

1. We generally discourage people from statically setting numShards, as
often runner-determined sharding is better.
2. In a case where users know that certain types of output files need a
different number of shards, they can always partition. e.g. partition into
a 10-shard and a 100-shard sink, with each sink writing dynamic files.

Eugene also brought up destination directory, but that part of the
FilenamePolicy interface is more a hint than anything else.
DestinationDirectory is realistically just the base directory for the temp
files, and the FilenamePolicy is free to ignore it.

Reuven

On Wed, May 24, 2017 at 1:54 PM, Eugene Kirpichov <
kirpichov@google.com.invalid> wrote:

> Hmm, on one hand this looks syntactically very appealing, on the other
> hand, it's icky to have a function return a PTransform at runtime, only to
> have some information be immediately extracted from that transform.
> Moreover, not all TextIO.Write transforms will be legal to return - e.g.
> most likely you're not allowed to return a transform that itself uses
> dynamic destinations.
>
> We should think more about how to decompose this problem.
> I think there are 2 natural elements to writing files:
> 1) where to put the files (let's call this file location)
> 2) how to write to a single file (let's call this file format. In case of
> Avro, this may theoretically include e.g. schema to be embedded in the
> file).
> There should be represented by different interfaces/classes in the API.
>
> Then:
> - Writing a set of elements to a single file location using a single file
> format = "write operation"
> - WriteFiles is able to route different elements to different write
> operations, with potentially different both locations and formats. I.e.
> it's configured by something like BQ's DynamicDestinations
> - TextIO and AvroIO are thin wrappers over WriteFiles
> - AvroIO in the future may be extended to support different schemas for
> different files - then it would be even more like BigQuery: it'd take also
> a SerializableFunction<T, GenericRecord> and a
> SerializableFunction<DestinationT, Schema>. That means that perhaps it may
> provide its own DynamicDestinations-like API to its users, more specific
> than the one exposed by low-level WriteFiles.
>
> This is pretty vague, but I think "AvroIO with dynamic schema and with
> (type of input PCollection = T) != (type being written = GenericRecord)" is
> a good target to guide search for the perfect API. WDYT?
>
> On Wed, May 24, 2017 at 11:24 AM Reuven Lax <re...@google.com.invalid>
> wrote:
>
> > Did you see that I modified the second proposal so that users can map
> > DestinationT to the actual PTransform (i.e. DestinationT->TextIO or
> > DestinationT->AvroIO). This means that users do not have to deal with
> > FileBasedSink or even know it exists.
> >
> > I prefer the second approach for two reason:
> >
> > 1. It allows customizing some useful things that the FilenamePolicy does
> > not. e.g. it's very reasonable to want to customize the output directory
> > and have a different number output shards for each directory. If the
> > function returns a TextIO or AvroIO they can do that. If there's simply a
> > mapping to a FilenamePolicy, the can't do that.
> >
> > 2. The majority of users don't need to deal with DefaultFilenamePolicy
> > today. Allowing them to use the TextIO etc. builders for this will be
> > more-familiar than the DefaultFilenamePolicy.Config option suggested.
> >
> > On Wed, May 24, 2017 at 10:59 AM, Kenneth Knowles <klk@google.com.invalid
> >
> > wrote:
> >
> > > I commented a little in the doc I want to reply on list because this
> is a
> > > really great feature.
> > >
> > > The two alternatives, as I understand them, both include mapping your
> > > elements to an intermediate DestinationT that you can group by before
> > > writing. Then the big picture decision is whether to map each
> > DestinationT
> > > to a different FilenamePolicy (which may need to be made more powerful)
> > or
> > > map each DestinationT to a different FileBasedSink.
> > >
> > > I think both are reasonable, modulo pitfalls that I'm probably glossing
> > > over. I favor the FilenamePolicy version a bit, because it is focused
> > just
> > > on the file names, whereas the FileBasedSink version seems a bit
> > > overpowered for the use case. The other consideration is that
> > > FilenamePolicy is intended for user consumption, while FileBasedSink is
> > not
> > > so much.
> > >
> > > Kenn
> > >
> > > On Thu, May 18, 2017 at 10:31 PM, Reuven Lax <relax@google.com.invalid
> >
> > > wrote:
> > >
> > > > While Beam now supports file-based sinks that can depend on the
> current
> > > > window, I've seen interest in value-dependent sinks as well (and
> > there's
> > > a
> > > > long-standing JIRA for this). I wrote up a short API proposal for
> this
> > > for
> > > > discussion on the list.
> > > >
> > > > https://docs.google.com/document/d/1Bd9mJO1YC8vOoFObJFupVURBMCl7j
> > > > Wt6hOgw6ClwxE4/edit?usp=sharing
> > > >
> > > > Reuven
> > > >
> > >
> >
>

Re: Dynamic file-based sinks

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
Hmm, on one hand this looks syntactically very appealing, on the other
hand, it's icky to have a function return a PTransform at runtime, only to
have some information be immediately extracted from that transform.
Moreover, not all TextIO.Write transforms will be legal to return - e.g.
most likely you're not allowed to return a transform that itself uses
dynamic destinations.

We should think more about how to decompose this problem.
I think there are 2 natural elements to writing files:
1) where to put the files (let's call this file location)
2) how to write to a single file (let's call this file format. In case of
Avro, this may theoretically include e.g. schema to be embedded in the
file).
There should be represented by different interfaces/classes in the API.

Then:
- Writing a set of elements to a single file location using a single file
format = "write operation"
- WriteFiles is able to route different elements to different write
operations, with potentially different both locations and formats. I.e.
it's configured by something like BQ's DynamicDestinations
- TextIO and AvroIO are thin wrappers over WriteFiles
- AvroIO in the future may be extended to support different schemas for
different files - then it would be even more like BigQuery: it'd take also
a SerializableFunction<T, GenericRecord> and a
SerializableFunction<DestinationT, Schema>. That means that perhaps it may
provide its own DynamicDestinations-like API to its users, more specific
than the one exposed by low-level WriteFiles.

This is pretty vague, but I think "AvroIO with dynamic schema and with
(type of input PCollection = T) != (type being written = GenericRecord)" is
a good target to guide search for the perfect API. WDYT?

On Wed, May 24, 2017 at 11:24 AM Reuven Lax <re...@google.com.invalid>
wrote:

> Did you see that I modified the second proposal so that users can map
> DestinationT to the actual PTransform (i.e. DestinationT->TextIO or
> DestinationT->AvroIO). This means that users do not have to deal with
> FileBasedSink or even know it exists.
>
> I prefer the second approach for two reason:
>
> 1. It allows customizing some useful things that the FilenamePolicy does
> not. e.g. it's very reasonable to want to customize the output directory
> and have a different number output shards for each directory. If the
> function returns a TextIO or AvroIO they can do that. If there's simply a
> mapping to a FilenamePolicy, the can't do that.
>
> 2. The majority of users don't need to deal with DefaultFilenamePolicy
> today. Allowing them to use the TextIO etc. builders for this will be
> more-familiar than the DefaultFilenamePolicy.Config option suggested.
>
> On Wed, May 24, 2017 at 10:59 AM, Kenneth Knowles <kl...@google.com.invalid>
> wrote:
>
> > I commented a little in the doc I want to reply on list because this is a
> > really great feature.
> >
> > The two alternatives, as I understand them, both include mapping your
> > elements to an intermediate DestinationT that you can group by before
> > writing. Then the big picture decision is whether to map each
> DestinationT
> > to a different FilenamePolicy (which may need to be made more powerful)
> or
> > map each DestinationT to a different FileBasedSink.
> >
> > I think both are reasonable, modulo pitfalls that I'm probably glossing
> > over. I favor the FilenamePolicy version a bit, because it is focused
> just
> > on the file names, whereas the FileBasedSink version seems a bit
> > overpowered for the use case. The other consideration is that
> > FilenamePolicy is intended for user consumption, while FileBasedSink is
> not
> > so much.
> >
> > Kenn
> >
> > On Thu, May 18, 2017 at 10:31 PM, Reuven Lax <re...@google.com.invalid>
> > wrote:
> >
> > > While Beam now supports file-based sinks that can depend on the current
> > > window, I've seen interest in value-dependent sinks as well (and
> there's
> > a
> > > long-standing JIRA for this). I wrote up a short API proposal for this
> > for
> > > discussion on the list.
> > >
> > > https://docs.google.com/document/d/1Bd9mJO1YC8vOoFObJFupVURBMCl7j
> > > Wt6hOgw6ClwxE4/edit?usp=sharing
> > >
> > > Reuven
> > >
> >
>

Re: Dynamic file-based sinks

Posted by Reuven Lax <re...@google.com.INVALID>.
Did you see that I modified the second proposal so that users can map
DestinationT to the actual PTransform (i.e. DestinationT->TextIO or
DestinationT->AvroIO). This means that users do not have to deal with
FileBasedSink or even know it exists.

I prefer the second approach for two reason:

1. It allows customizing some useful things that the FilenamePolicy does
not. e.g. it's very reasonable to want to customize the output directory
and have a different number output shards for each directory. If the
function returns a TextIO or AvroIO they can do that. If there's simply a
mapping to a FilenamePolicy, the can't do that.

2. The majority of users don't need to deal with DefaultFilenamePolicy
today. Allowing them to use the TextIO etc. builders for this will be
more-familiar than the DefaultFilenamePolicy.Config option suggested.

On Wed, May 24, 2017 at 10:59 AM, Kenneth Knowles <kl...@google.com.invalid>
wrote:

> I commented a little in the doc I want to reply on list because this is a
> really great feature.
>
> The two alternatives, as I understand them, both include mapping your
> elements to an intermediate DestinationT that you can group by before
> writing. Then the big picture decision is whether to map each DestinationT
> to a different FilenamePolicy (which may need to be made more powerful) or
> map each DestinationT to a different FileBasedSink.
>
> I think both are reasonable, modulo pitfalls that I'm probably glossing
> over. I favor the FilenamePolicy version a bit, because it is focused just
> on the file names, whereas the FileBasedSink version seems a bit
> overpowered for the use case. The other consideration is that
> FilenamePolicy is intended for user consumption, while FileBasedSink is not
> so much.
>
> Kenn
>
> On Thu, May 18, 2017 at 10:31 PM, Reuven Lax <re...@google.com.invalid>
> wrote:
>
> > While Beam now supports file-based sinks that can depend on the current
> > window, I've seen interest in value-dependent sinks as well (and there's
> a
> > long-standing JIRA for this). I wrote up a short API proposal for this
> for
> > discussion on the list.
> >
> > https://docs.google.com/document/d/1Bd9mJO1YC8vOoFObJFupVURBMCl7j
> > Wt6hOgw6ClwxE4/edit?usp=sharing
> >
> > Reuven
> >
>

Re: Dynamic file-based sinks

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
I commented a little in the doc I want to reply on list because this is a
really great feature.

The two alternatives, as I understand them, both include mapping your
elements to an intermediate DestinationT that you can group by before
writing. Then the big picture decision is whether to map each DestinationT
to a different FilenamePolicy (which may need to be made more powerful) or
map each DestinationT to a different FileBasedSink.

I think both are reasonable, modulo pitfalls that I'm probably glossing
over. I favor the FilenamePolicy version a bit, because it is focused just
on the file names, whereas the FileBasedSink version seems a bit
overpowered for the use case. The other consideration is that
FilenamePolicy is intended for user consumption, while FileBasedSink is not
so much.

Kenn

On Thu, May 18, 2017 at 10:31 PM, Reuven Lax <re...@google.com.invalid>
wrote:

> While Beam now supports file-based sinks that can depend on the current
> window, I've seen interest in value-dependent sinks as well (and there's a
> long-standing JIRA for this). I wrote up a short API proposal for this for
> discussion on the list.
>
> https://docs.google.com/document/d/1Bd9mJO1YC8vOoFObJFupVURBMCl7j
> Wt6hOgw6ClwxE4/edit?usp=sharing
>
> Reuven
>