You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Neville Li <ne...@gmail.com> on 2019/07/19 15:15:56 UTC

Sort Merge Bucket - Action Items

Forking this thread to discuss action items regarding the change. We can
keep technical discussion in the original thread.

Background: our SMB POC showed promising performance & cost saving
improvements and we'd like to adopt it for production soon (by EOY). We
want to contribute it to Beam so it's better generalized and maintained. We
also want to avoid divergence between our internal version and the PR while
it's in progress, specifically any breaking change in the produced SMB data.

To achieve that I'd like to propose a few action items.

   1. Reach a consensus about bucket and shard strategy, key handling,
   bucket file and metadata format, etc., anything that affect produced SMB
   data.
   2. Revise the existing PR according to #1
   3. Reduce duplicate file IO logic by reusing FileIO.Sink, Compression,
   etc., but keep the existing file level abstraction
   4. (Optional) Merge code into extensions::smb but mark clearly
   as @experimental
   5. Incorporate ideas from the discussion, e.g. ShardingFn,
   GroupByKeyAndSortValues, FileIO generalization, key URN, etc.

#1-4 gives us something usable in the short term, while #1 guarantees that
production data produced today are usable when #5 lands on master. #4 also
gives early adopters a chance to give feedback.
Due to the scope of #5, it might take much longer and a couple of big PRs
to achieve, which we can keep iterating on.

What are your thoughts on this?

On Thu, Jul 18, 2019 at 5:32 AM Robert Bradshaw <ro...@google.com> wrote:

> On Wed, Jul 17, 2019 at 9:12 PM Gleb Kanterov <gl...@spotify.com> wrote:
> >>
> >> Suppose one assigns a sharding function to a PCollection. Is it lazy,
> >> or does it induce a reshuffle right at that point? In either case,
> >> once the ShardingFn has been applied, how long does it remain in
> >> effect? Does it prohibit the runner (or user) from doing subsequent
> >> resharding (including dynamic load balancing)? What happens when one
> >> has a DoFn that changes the value? (Including the DoFns in our sinks
> >> that assign random keys.)
> >
> >
> > What if we would reason about sharding in the same way as we reason
> about timestamps?
> >
> > Please correct me if I am wrong, as I know, in Beam, timestamps exist
> for each element. You can get timestamp by using Reify.timestamps. If there
> are timestamped values, and they go through ParDo, timestamps are preserved.
>
> That is correct.
>
> > We can think of the same with sharding, where Reify.shards would be
> PTransform<PCollection<T>, ShardedValue<T>> and ShardedValue<?> would
> contain shard and a grouping key.
>
> Meaning the shard that the PCollection is currently sharded by, or the
> one that it should be sharded by in the future. (Your use case is a
> bit strange in that a single key may be spread across multiple shards,
> as long as they're part of the same "bucket.")
>
> > ParDo wouldn't change sharding and would propagate ShardingFn.
>
> The ShardingFn may not be applicable to downstream (mutated) elements.
>
> FYI, internally this is handled by having annotations on DoFns as
> being key-preserving, and only reasoning about operations separated by
> such DoFns.
>
> > CoGroupByKey on such PTransforms would reify grouping key, and do
> regular CoGroupByKey, or be rewritten to a regular ParDo if sharding of
> inputs is compatible.
> >
> > As you mentioned, it requires dynamic work rebalancing to preserve
> sharding. What if we do dynamic work rebalancing for each shard
> independently, as, I guess, it's done today for fixed windows.
>
> Currently, the unit of colocation is by key. Generally sharding
> introduces a notion of colocation where multiple keys (or mulitple
> elements, I suppose it need not be keyed) are promised to be processed
> by the same machine. This is both to constraining (wrt dynamic
> reshrading) and not needed (with respect to SMB, as your "colocation"
> is per bucket, but buckets themselves can be processed in a
> distributed manner).
>
> > When we do a split, we would split one shard into two. It should be
> possible to do consistently if values within buckets are sorted, in this
> case, we would split ranges of possible values.
>
> I'm not quite following here. Suppose one processes element a, m, and
> z. Then one decides to split the bundle, but there's not a "range" we
> can pick for the "other" as this bundle already spans the whole range.
> But maybe I'm just off in the weeds here.
>
> > On Wed, Jul 17, 2019 at 6:37 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> On Wed, Jul 17, 2019 at 4:26 PM Gleb Kanterov <gl...@spotify.com> wrote:
> >> >
> >> > I find there is an interesting point in the comments brought by Ahmed
> Eleryan. Similar to WindowFn, having a concept of ShardingFn, that enables
> users to implement a class for sharding data. Each Beam node can have
> ShardingFn set, similar to WindowFn (or WindowingStrategy). Sinks and
> sources are aware of that and preserve this information. Using that it's
> possible to do optimization on Beam graph, removing redundant CoGroupByKey,
> and it would be transparent to users.
> >> >
> >> > It feels like a nice addition to the Beam model, or possibly we can
> implement it using existing windowing mechanics. There are people on the
> list with strong experience in the area, I'm wondering what do you think?
> >>
> >> I've actually thought about this some, though it's been quite a while.
> >> At the time it seemed hard to work it into a cohesive part of the
> >> model (even ignoring the fact that sharding is primarily an execution,
> >> rather than logical, property).
> >>
> >> Suppose one assigns a sharding function to a PCollection. Is it lazy,
> >> or does it induce a reshuffle right at that point? In either case,
> >> once the ShardingFn has been applied, how long does it remain in
> >> effect? Does it prohibit the runner (or user) from doing subsequent
> >> resharding (including dynamic load balancing)? What happens when one
> >> has a DoFn that changes the value? (Including the DoFns in our sinks
> >> that assign random keys.)
> >>
> >> Right now one can get most of the semantics of sharding by keying by
> >> the shard id and doing a GBK, where the resulting value set (which is
> >> allowed to be arbitrarily big) is the (indivisible) shard (e.g. for
> >> writing to a single file.)
> >>
> >> I think sharding (like ordering, the two are quite related) is a
> >> property that a PCollection can have, and could be leveraged by the
> >> optimizer, but it's difficult to see how it's propagated through
> >> transforms. The most sane way to reason about it IMHO is similar to
> >> sink triggers, where one specifies that one wants a sharding at some
> >> point, and the runner arranges things upstream such that it is so, and
> >> some operations can declare that they happen to produce data sharded
> >> in some way (though again, PCollection to PCollection one needs a
> >> consistent notion of key to have a consistent notion of sharding).
> >>
> >> > Gleb
> >> >
> >> > On Tue, Jul 16, 2019 at 11:34 PM Eugene Kirpichov <
> kirpichov@google.com> wrote:
> >> >>
> >> >> I'd like to reiterate the request to not build anything on top of
> FileBasedSource/Reader.
> >> >> If the design requires having some interface for representing a
> function from a filename to a stream of records, better introduce a new
> interface for that.
> >> >> If it requires interoperability with other IOs that read files,
> better change them to use the new interface.
> >> >>
> >> >> On Tue, Jul 16, 2019 at 9:08 AM Chamikara Jayalath <
> chamikara@google.com> wrote:
> >> >>>
> >> >>> Thanks this clarifies a lot.
> >> >>>
> >> >>> For writer, I think it's great if you can utilize existing
> FileIO.Sink implementations even if you have to reimplement some of the
> logic (for example compression, temp file handling) that is already
> implemented in Beam FileIO/WriteFiles transforms in your SMB sink transform.
> >> >>>
> >> >>> For reader, you are right that there's no FileIO.Read. What we have
> are various implementations of FileBasedSource/FileBasedReader classes that
> are currently intentionally hidden since Beam IO transforms are expected to
> be the intended public interface for users. If you can expose and re-use
> these classes with slight modifications (keeping backwards compatibility)
> I'm OK with it. Otherwise you'll have to write your own reader
> implementations.
> >> >>>
> >> >>> In general, seems like SMB has very strong requirements related to
> sharding/hot-key management that are not easily achievable by implementing
> SMB source/sink as a composite transform that utilizes existing source/sink
> transforms. This forces you to implement this logic in your own DoFns and
> existing Beam primitives are not easily re-usable in this context.
> >> >>>
> >> >>> Thanks,
> >> >>> Cham
> >> >>>
> >> >>> On Tue, Jul 16, 2019 at 8:26 AM Neville Li <ne...@gmail.com>
> wrote:
> >> >>>>
> >> >>>> A little clarification of the IO requirement and my understanding
> of the current state of IO.
> >> >>>>
> >> >>>> tl;dr: not sure if there're reusable bits for the reader. It's
> possible to reuse some for the writer but with heavy refactoring.
> >> >>>>
> >> >>>> Reader
> >> >>>>
> >> >>>> For each bucket (containing the same key partition, sorted) across
> multiple input data sets, we stream records from bucket files and merge
> sort.
> >> >>>> We open the files in a DoFn, and emit KV<K, CoGbkResult> where the
> CGBKR encapsulates Iterable<V> from each input.
> >> >>>> Basically we need a simple API like ResourceId -> Iterator<T>,
> i.e. sequential read, no block/offset/split requirement.
> >> >>>> FileBasedSource.FileBasedReader seems the closest fit but they're
> nested & decoupled.
> >> >>>> There's no FileIO.Read, only a ReadMatches[1], which can be used
> with ReadAllViaFileBasedSource<T>. But that's not the granularity we need,
> since we lose ordering of the input records, and can't merge 2+ sources.
> >> >>>>
> >> >>>> Writer
> >> >>>>
> >> >>>> We get a `PCollection<BucketShardId, Iterable<T>>` after bucket
> and and sort, where Iterable<T> is the records sorted by key and
> BucketShardId is used to produce filename, e.g.
> bucket-00001-shard-00002.avro.
> >> >>>> We write each Iterable<T> to a temp file and move to final
> destination when done. Both should ideally reuse existing code.
> >> >>>> Looks like FileIO.Sink (and impls in AvroIO, TextIO, TFRecordIO)
> supports record writing into a WritableByteChannel, but some logic like
> compression is handled in FileIO through ViaFileBasedSink which extends
> FileBasedSink.
> >> >>>> FileIO uses WriteFiles[3] to shard and write of PCollection<T>.
> Again we lose ordering of the output records or custom file naming scheme.
> However, WriteShardsIntoTempFilesFn[4] and FinalizeTempFileBundles[5] in
> WriteFiles seem closest to our need but would have to be split out and
> generalized.
> >> >>>>
> >> >>>> Note on reader block/offset/split requirement
> >> >>>>
> >> >>>> Because of the merge sort, we can't split or offset seek a bucket
> file. Because without persisting the offset index of a key group somewhere,
> we can't efficiently skip to a key group without exhausting the previous
> ones. Furthermore we need to merge sort and align keys from multiple
> sources, which may not have the same key distribution. It might be possible
> to binary search for matching keys but that's extra complication. IMO the
> reader work distribution is better solved by better bucket/shard strategy
> in upstream writer.
> >> >>>>
> >> >>>> References
> >> >>>>
> >> >>>> ReadMatches extends PTransform<PCollection<MatchResult.Metadata>,
> PCollection<ReadableFile>>
> >> >>>> ReadAllViaFileBasedSource<T> extends
> PTransform<PCollection<ReadableFile>, PCollection<T>>
> >> >>>> WriteFiles<UserT, DestinationT, OutputT> extends
> PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>
> >> >>>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>,
> Iterable<UserT>>, FileResult<DestinationT>>
> >> >>>> FinalizeTempFileBundles extends PTransform<
> PCollection<List<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>>
> >> >>>>
> >> >>>>
> >> >>>> On Tue, Jul 16, 2019 at 5:15 AM Robert Bradshaw <
> robertwb@google.com> wrote:
> >> >>>>>
> >> >>>>> On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov <
> kirpichov@google.com> wrote:
> >> >>>>> >
> >> >>>>> > Quick note: I didn't look through the document, but please do
> not build on either FileBasedSink or FileBasedReader. They are both
> remnants of the old, non-composable IO world; and in fact much of the
> composable IO work emerged from frustration with their limitations and
> recognizing that many other IOs were suffering from the same limitations.
> >> >>>>> > Instead of FileBasedSink, build on FileIO.write; instead of
> FileBasedReader, build on FileIO.read.
> >> >>>>>
> >> >>>>> +1
> >> >>>>>
> >> >>>>> I think the sink could be written atop FileIO.write, possibly
> using
> >> >>>>> dynamic destinations. At the very least the FileSink interface,
> which
> >> >>>>> handles the details of writing a single shard, would be an ideal
> way
> >> >>>>> to parameterize an SMB sink. It seems that none of our existing
> IOs
> >> >>>>> (publically?) expose FileSink implementations.
> >> >>>>>
> >> >>>>> FileIO.read is not flexible enough to do the merging. Eugene, is
> there
> >> >>>>> a composable analogue to FileSink, for sources, i.e. something
> that
> >> >>>>> can turn a file handle (possibly with offsets) into a set of
> records
> >> >>>>> other than FileBasedReader?
> >> >>>>>
> >> >>>>> > On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov <gl...@spotify.com>
> wrote:
> >> >>>>> >>
> >> >>>>> >> I share the same concern with Robert regarding re-implementing
> parts of IO. At the same time, in the past, I worked on internal libraries
> that try to re-use code from existing IO, and it's hardly possible because
> it feels like it wasn't designed for re-use. There are a lot of classes
> that are nested (non-static) or non-public. I can understand why they were
> made non-public, it's a hard abstraction to design well and keep
> compatibility. As Neville mentioned, decoupling readers and writers would
> not only benefit for this proposal but for any other use-case that has to
> deal with low-level API such as FileSystem API, that is hardly possible
> today without copy-pasting,
> >> >>>>> >>
> >> >>>>> >>
> >> >>>>> >>
> >> >>>>> >>
> >> >>>>> >>
> >> >>>>> >> On Mon, Jul 15, 2019 at 5:05 PM Neville Li <
> neville.lyh@gmail.com> wrote:
> >> >>>>> >>>
> >> >>>>> >>> Re: avoiding mirroring IO functionality, what about:
> >> >>>>> >>>
> >> >>>>> >>> - Decouple the nested FileBasedSink.Writer and
> FileBasedSource.FileBasedReader, make them top level and remove references
> to parent classes.
> >> >>>>> >>> - Simplify the interfaces, while maintaining support for
> block/offset read & sequential write.
> >> >>>>> >>> - As a bonus, the refactored IO classes can be used
> standalone in case when the user wants to perform custom IO in a DoFn, i.e.
> a PTransform<PCollection<URI>, PCollection<KV<URI, GenericRecord>>>. Today
> this requires a lot of copy-pasted Avro boilerplate.
> >> >>>>> >>> - For compatibility, we can delegate to the new classes from
> the old ones and remove them in the next breaking release.
> >> >>>>> >>>
> >> >>>>> >>> Re: WriteFiles logic, I'm not sure about generalizing it, but
> what about splitting the part handling writing temp files into a new
> PTransform<PCollection<KV<ResourceId, Iterable<UserT>>>,
> PCollection<WriteFilesResult<DestinationT>>>? That splits the bucket-shard
> logic from actual file IO.
> >> >>>>> >>>
> >> >>>>> >>> On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw <
> robertwb@google.com> wrote:
> >> >>>>> >>>>
> >> >>>>> >>>> I agree that generalizing the existing FileIO may not be the
> right
> >> >>>>> >>>> path forward, and I'd only make their innards public with
> great care.
> >> >>>>> >>>> (Would this be used like like
> >> >>>>> >>>> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?)
> SMB is a bit
> >> >>>>> >>>> unique that the source and sink are much more coupled than
> other
> >> >>>>> >>>> sources and sinks (which happen to be completely
> independent, if
> >> >>>>> >>>> complementary implementations, whereas SMB attempts to be a
> kind of
> >> >>>>> >>>> pipe where one half is instanciated in each pipeline).
> >> >>>>> >>>>
> >> >>>>> >>>> In short, an SMB source/sink that is parameterized by an
> arbitrary,
> >> >>>>> >>>> existing IO would be ideal (but possibly not feasible (per
> existing
> >> >>>>> >>>> prioritizations)), or an SMB source/sink that works as a
> pair. What
> >> >>>>> >>>> I'd like to avoid is a set of parallel SMB IO classes that
> (partially,
> >> >>>>> >>>> and incompletely) mirror the existing IO ones (from an API
> >> >>>>> >>>> perspective--how much implementation it makes sense to share
> is an
> >> >>>>> >>>> orthogonal issue that I'm sure can be worked out.)
> >> >>>>> >>>>
> >> >>>>> >>>> On Mon, Jul 15, 2019 at 4:18 PM Neville Li <
> neville.lyh@gmail.com> wrote:
> >> >>>>> >>>> >
> >> >>>>> >>>> > Hi Robert,
> >> >>>>> >>>> >
> >> >>>>> >>>> > I agree, it'd be nice to reuse FileIO logic of different
> file types. But given the current code structure of FileIO & scope of the
> change, I feel it's better left for future refactor PRs.
> >> >>>>> >>>> >
> >> >>>>> >>>> > Some thoughts:
> >> >>>>> >>>> > - SMB file operation is simple single file sequential
> reads/writes, which already exists as Writer & FileBasedReader but are
> private inner classes, and have references to the parent Sink/Source
> instance.
> >> >>>>> >>>> > - The readers also have extra offset/split logic but that
> can be worked around.
> >> >>>>> >>>> > - It'll be nice to not duplicate temp->destination file
> logic but again WriteFiles is assuming a single integer shard key, so it'll
> take some refactoring to reuse it.
> >> >>>>> >>>> >
> >> >>>>> >>>> > All of these can be done in backwards compatible way. OTOH
> generalizing the existing components too much (esp. WriteFiles, which is
> already complex) might lead to two logic paths, one specialized for the SMB
> case. It might be easier to decouple some of them for better reuse. But
> again I feel it's a separate discussion.
> >> >>>>> >>>> >
> >> >>>>> >>>> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty <
> claire.d.mcginty@gmail.com> wrote:
> >> >>>>> >>>> >>
> >> >>>>> >>>> >> Thanks Robert!
> >> >>>>> >>>> >>
> >> >>>>> >>>> >> We'd definitely like to be able to re-use existing I/O
> components--for example the Writer<DestinationT,
> OutputT>/FileBasedReader<T> (since they operate on a
> WritableByteChannel/ReadableByteChannel, which is the level of granularity
> we need) but the Writers, at least, seem to be mostly private-access. Do
> you foresee them being made public at any point?
> >> >>>>> >>>> >>
> >> >>>>> >>>> >> - Claire
> >> >>>>> >>>> >>
> >> >>>>> >>>> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw <
> robertwb@google.com> wrote:
> >> >>>>> >>>> >>>
> >> >>>>> >>>> >>> I left some comments on the doc.
> >> >>>>> >>>> >>>
> >> >>>>> >>>> >>> I think the general idea is sound, but one thing that
> worries me is
> >> >>>>> >>>> >>> the introduction of a parallel set of IOs that mirrors
> the (existing)
> >> >>>>> >>>> >>> FileIOs. I would suggest either (1) incorporate this
> functionality
> >> >>>>> >>>> >>> into the generic FileIO infrastructure, or let it be
> parameterized by
> >> >>>>> >>>> >>> arbitrary IO (which I'm not sure is possible, especially
> for the Read
> >> >>>>> >>>> >>> side (and better would be the capability of supporting
> arbitrary
> >> >>>>> >>>> >>> sources, aka an optional "as-sharded-source" operation
> that returns a
> >> >>>>> >>>> >>> PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where
> the iterable is
> >> >>>>> >>>> >>> promised to be in key order)) or support a single SMB aka
> >> >>>>> >>>> >>> "PreGrouping" source/sink pair that's aways used
> together (and whose
> >> >>>>> >>>> >>> underlying format is not necessarily public).
> >> >>>>> >>>> >>>
> >> >>>>> >>>> >>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li <
> neville.lyh@gmail.com> wrote:
> >> >>>>> >>>> >>> >
> >> >>>>> >>>> >>> > 4 people have commented but mostly clarifying details
> and not much on the overall design.
> >> >>>>> >>>> >>> >
> >> >>>>> >>>> >>> > It'd be great to have thumbs up/down on the design,
> specifically metadata, bucket & shard strategy, etc., since that affects
> backwards compatibility of output files.
> >> >>>>> >>>> >>> > Some breaking changes, e.g. dynamic # of shards, are
> out of scope for V1 unless someone feels strongly about it. The current
> scope should cover all our use cases and leave room for optimization.
> >> >>>>> >>>> >>> >
> >> >>>>> >>>> >>> > Once green lighted we can start adopting internally,
> ironing out rough edges while iterating on the PRs in parallel.
> >> >>>>> >>>> >>> >
> >> >>>>> >>>> >>> > Most of the implementation is self-contained in the
> extensions:smb module, except making a few core classes/methods public for
> reuse. So despite the amount of work it's still fairly low risk to the code
> base. There're some proposed optimization & refactoring involving core (see
> appendix) but IMO they're better left for followup PRs.
> >> >>>>> >>>> >>> >
> >> >>>>> >>>> >>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles <
> kenn@apache.org> wrote:
> >> >>>>> >>>> >>> >>
> >> >>>>> >>>> >>> >> I've seen some discussion on the doc. I cannot tell
> whether the questions are resolved or what the status of review is. Would
> you mind looping this thread with a quick summary? This is such a major
> piece of work I don't want it to sit with everyone thinking they are
> waiting on someone else, or any such thing. (not saying this is happening,
> just pinging to be sure)
> >> >>>>> >>>> >>> >>
> >> >>>>> >>>> >>> >> Kenn
> >> >>>>> >>>> >>> >>
> >> >>>>> >>>> >>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li <
> neville.lyh@gmail.com> wrote:
> >> >>>>> >>>> >>> >>>
> >> >>>>> >>>> >>> >>> Updated the doc a bit with more future work
> (appendix). IMO most of them are non-breaking and better done in separate
> PRs later since some involve pretty big refactoring and are outside the
> scope of MVP.
> >> >>>>> >>>> >>> >>>
> >> >>>>> >>>> >>> >>> For now we'd really like to get feedback on some
> fundamental design decisions and find a way to move forward.
> >> >>>>> >>>> >>> >>>
> >> >>>>> >>>> >>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li <
> neville.lyh@gmail.com> wrote:
> >> >>>>> >>>> >>> >>>>
> >> >>>>> >>>> >>> >>>> Thanks. I responded to comments in the doc. More
> inline.
> >> >>>>> >>>> >>> >>>>
> >> >>>>> >>>> >>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath <
> chamikara@google.com> wrote:
> >> >>>>> >>>> >>> >>>>>
> >> >>>>> >>>> >>> >>>>> Thanks added few comments.
> >> >>>>> >>>> >>> >>>>>
> >> >>>>> >>>> >>> >>>>> If I understood correctly, you basically assign
> elements with keys to different buckets which are written to unique files
> and merge files for the same key while reading ?
> >> >>>>> >>>> >>> >>>>>
> >> >>>>> >>>> >>> >>>>> Some of my concerns are.
> >> >>>>> >>>> >>> >>>>>
> >> >>>>> >>>> >>> >>>>> (1)  Seems like you rely on an in-memory sorting
> of buckets. Will this end up limiting the size of a PCollection you can
> process ?
> >> >>>>> >>>> >>> >>>>
> >> >>>>> >>>> >>> >>>> The sorter transform we're using supports spilling
> and external sort. We can break up large key groups further by sharding,
> similar to fan out in some GBK transforms.
> >> >>>>> >>>> >>> >>>>
> >> >>>>> >>>> >>> >>>>> (2) Seems like you rely on
> Reshuffle.viaRandomKey() which is actually implemented using a shuffle
> (which you try to replace with this proposal).
> >> >>>>> >>>> >>> >>>>
> >> >>>>> >>>> >>> >>>> That's for distributing task metadata, so that each
> DoFn thread picks up a random bucket and sort merge key-values. It's not
> shuffling actual data.
> >> >>>>> >>>> >>> >>>>
> >> >>>>> >>>> >>> >>>>>
> >> >>>>> >>>> >>> >>>>> (3) I think (at least some of the) shuffle
> implementations are implemented in ways similar to this (writing to files
> and merging). So I'm wondering if the performance benefits you see are for
> a very specific case and may limit the functionality in other ways.
> >> >>>>> >>>> >>> >>>>
> >> >>>>> >>>> >>> >>>> This is for the common pattern of few core data
> producer pipelines and many downstream consumer pipelines. It's not
> intended to replace shuffle/join within a single pipeline. On the producer
> side, by pre-grouping/sorting data and writing to bucket/shard output
> files, the consumer can sort/merge matching ones without a CoGBK.
> Essentially we're paying the shuffle cost upfront to avoid them repeatedly
> in each consumer pipeline that wants to join data.
> >> >>>>> >>>> >>> >>>>
> >> >>>>> >>>> >>> >>>>>
> >> >>>>> >>>> >>> >>>>> Thanks,
> >> >>>>> >>>> >>> >>>>> Cham
> >> >>>>> >>>> >>> >>>>>
> >> >>>>> >>>> >>> >>>>>
> >> >>>>> >>>> >>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li <
> neville.lyh@gmail.com> wrote:
> >> >>>>> >>>> >>> >>>>>>
> >> >>>>> >>>> >>> >>>>>> Ping again. Any chance someone takes a look to
> get this thing going? It's just a design doc and basic metadata/IO impl.
> We're not talking about actual source/sink code yet (already done but saved
> for future PRs).
> >> >>>>> >>>> >>> >>>>>>
> >> >>>>> >>>> >>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay <
> altay@google.com> wrote:
> >> >>>>> >>>> >>> >>>>>>>
> >> >>>>> >>>> >>> >>>>>>> Thank you Claire, this looks promising.
> Explicitly adding a few folks that might have feedback: +Ismaël Mejía
> +Robert Bradshaw +Lukasz Cwik +Chamikara Jayalath
> >> >>>>> >>>> >>> >>>>>>>
> >> >>>>> >>>> >>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty <
> claire.d.mcginty@gmail.com> wrote:
> >> >>>>> >>>> >>> >>>>>>>>
> >> >>>>> >>>> >>> >>>>>>>> Hey dev@!
> >> >>>>> >>>> >>> >>>>>>>>
> >> >>>>> >>>> >>> >>>>>>>> Myself and a few other Spotify data engineers
> have put together a design doc for SMB Join support in Beam, and have a
> working Java implementation we've started to put up for PR ([0], [1], [2]).
> There's more detailed information in the document, but the tl;dr is that
> SMB is a strategy to optimize joins for file-based sources by modifying the
> initial write operation to write records in sorted buckets based on the
> desired join key. This means that subsequent joins of datasets written in
> this way are only sequential file reads, no shuffling involved. We've seen
> some pretty substantial performance speedups with our implementation and
> would love to get it checked in to Beam's Java SDK.
> >> >>>>> >>>> >>> >>>>>>>>
> >> >>>>> >>>> >>> >>>>>>>> We'd appreciate any suggestions or feedback on
> our proposal--the design doc should be public to comment on.
> >> >>>>> >>>> >>> >>>>>>>>
> >> >>>>> >>>> >>> >>>>>>>> Thanks!
> >> >>>>> >>>> >>> >>>>>>>> Claire / Neville
> >> >>>>> >>
> >> >>>>> >>
> >> >>>>> >>
> >> >>>>> >> --
> >> >>>>> >> Cheers,
> >> >>>>> >> Gleb
> >> >
> >> >
> >> >
> >> > --
> >> > Cheers,
> >> > Gleb
> >
> >
> >
> > --
> > Cheers,
> > Gleb
>

Re: Sort Merge Bucket - Action Items

Posted by Claire McGinty <cl...@gmail.com>.
As far as I/O code re-use, the consensus seems to be to make the SMB module
as composable as possible using existing Beam components, ideally as-is or
with very basic tweaks.

To be clear, what I care about is that WriteFiles(X) and
> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
> TFRecord, ...}. In other words composability of the API (vs. manually
> filling out the matrix). If WriteFiles and WriteSmbFiles find
> opportunities for (easy, clean) implementation sharing, that'd be
> nice, but not the primary goal.


 For SMB writes, it's a pretty easy change to parameterize them by
FileIO.Sink, for which there are already public implementations for
Avro/TFRecord/Parquet/Text! It'll remove a lot of code duplication from the
smb module.

(Similarly for reading, though that's seem less obvious. Certainly
> whatever T is useful for ReadSmb(T) could be useful for a
> (non-liquid-shading) ReadAll(T) however.)


It seems like there isn't an easily composable equivalent for Reader that
isn't coupled with FileBasedSource (+1 on Gleb's question about the
long-term future of org.apache.beam.sdk.io.Read). One thing we could do to
improve parity between SMB reads and Beam's io module is to use ReadableFile
<https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L381>
as
our file handles (currently we just use plain ResourceIds + FileSystems api
to open), so our Source transform would more closely resemble the existing
ReadFiles transforms
<https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L698>.
ReadableFile also brings in io.Compression
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java>
and
handles opening a ReadableByteChannel for us. We'd still be re-implementing
the I/O operations that deserialize a bytestream into individual elements,
but this seems unavoidable for the time being.

Let me know what you think about these proposed modifications to SMB
read/write!

Thanks,
Claire

On Thu, Jul 25, 2019 at 9:39 AM Gleb Kanterov <gl...@spotify.com> wrote:

> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going
> away in favor of SDF, or we are always going to have both?
>
> I was looking into AvroIO.read and AvroIO.readAll, both of them
> use AvroSource. AvroIO.readAll is using SDF, and it's implemented with
> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
> ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it
> should be enough to have something like (KV<ReadableFile, OffsetRange>,
> OutputReceiver<T>), as we have discussed in this thread, and that should be
> fine for SMB as well. It would require duplicating code from AvroSource,
> but in the end, I don't see it as a problem if AvroSource is going away.
>
> I'm attaching a small diagram I put for myself to better understand the
> code.
>
> AvroIO.readAll :: PTransform<PBegin, PCollection<T>> ->
>
> FileIO.matchAll :: PTransform<PCollection<String>,
> PCollection<MatchResult.Metadata>>
> FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>,
> PCollection<ReadableFile>>
> AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>,
> PCollection<T>> ->
>
> ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>,
> PCollection<T>> ->
>
> ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile,
> OffsetRange>>) (splittable do fn)
>
> Reshuffle.viaRandomKey()
>
> ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile,
> OffsetRange>, T>) where
>
> createSource :: String -> FileBasedSource<T>
>
> createSource = AvroSource
>
>
> AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin,
> PCollection<T>> ->
>
> Read.Bounded.from(createSource) where
>
> createSource :: String -> FileBasedSource<T>
>
> createSource = AvroSource
>
>
> Gleb
>
>
> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <ke...@apache.org> wrote:
>> >
>> > From the peanut gallery, keeping a separate implementation for SMB
>> seems fine. Dependencies are serious liabilities for both upstream and
>> downstream. It seems like the reuse angle is generating extra work, and
>> potentially making already-complex implementations more complex, instead of
>> helping things.
>>
>> +1
>>
>> To be clear, what I care about is that WriteFiles(X) and
>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
>> TFRecord, ...}. In other words composability of the API (vs. manually
>> filling out the matrix). If WriteFiles and WriteSmbFiles find
>> opportunities for (easy, clean) implementation sharing, that'd be
>> nice, but not the primary goal.
>>
>> (Similarly for reading, though that's seem less obvious. Certainly
>> whatever T is useful for ReadSmb(T) could be useful for a
>> (non-liquid-shading) ReadAll(T) however.)
>>
>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <ne...@gmail.com>
>> wrote:
>> >>
>> >> I spoke too soon. Turns out for unsharded writes, numShards can't be
>> determined until the last finalize transform, which is again different from
>> the current SMB proposal (static number of buckets & shards).
>> >> I'll end up with more code specialized for SMB in order to generalize
>> existing sink code, which I think we all want to avoid.
>> >>
>> >> Seems the only option is duplicating some logic like temp file
>> handling, which is exactly what we did in the original PR.
>> >> I can reuse Compression & Sink<T> for file level writes but that seems
>> about the most I can reuse right now.
>> >>
>> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <ne...@gmail.com>
>> wrote:
>> >>>
>> >>> So I spent one afternoon trying some ideas for reusing the last few
>> transforms WriteFiles.
>> >>>
>> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>,
>> Iterable<UserT>>, FileResult<DestinationT>>
>> >>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>,
>> PCollection<List<ResultT>>>
>> >>> => FinalizeTempFileBundles extends
>> PTransform<PCollection<List<FileResult<DestinationT>>>,
>> WriteFilesResult<DestinationT>>
>> >>>
>> >>> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId>
>> so I can use pre-compute SMB destination file names for the transforms.
>> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
>> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
>> private and easy to change/pull out.
>> >>>
>> >>> OTOH they are somewhat coupled with the package private
>> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
>> temp file handing logic lives). Might be hard to decouple either modifying
>> existing code or creating new transforms, unless if we re-write most of
>> FileBasedSink from scratch.
>> >>>
>> >>> Let me know if I'm on the wrong track.
>> >>>
>> >>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>> >>>
>> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <
>> chamikara@google.com> wrote:
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>>>>
>> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <
>> kirpichov@google.com> wrote:
>> >>>>> >
>> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>>>> >>
>> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <
>> neville.lyh@gmail.com> wrote:
>> >>>>> >> >
>> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it
>> and see what needs to be done.
>> >>>>> >> >
>> >>>>> >> > Eugene pointed out that we shouldn't build on
>> FileBased{Source,Sink}. So for writes I'll probably build on top of
>> WriteFiles.
>> >>>>> >>
>> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>> >>>>> >>
>> >>>>> >>
>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>> >>>>> >
>> >>>>> > Yeah if possible, parameterize FileIO.Sink.
>> >>>>> > I would recommend against building on top of WriteFiles either.
>> FileIO being implemented on top of WriteFiles was supposed to be a
>> temporary measure - the longer-term plan was to rewrite it from scratch
>> (albeit with a similar structure) and throw away WriteFiles.
>> >>>>> > If possible, I would recommend to pursue this path: if there are
>> parts of WriteFiles you want to reuse, I would recommend to implement them
>> as new transforms, not at all tied to FileBasedSink (but ok if tied to
>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
>> of these new transforms, or maybe parts of WriteFiles could be swapped out
>> for them incrementally.
>> >>>>>
>> >>>>> Thanks for the feedback. There's a lot that was done, but looking at
>> >>>>> the code it feels like there's a lot that was not yet done either,
>> and
>> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not
>> finding
>> >>>>> the right docs).
>> >>>>
>> >>>>
>> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and for
>> updating source interfaces, but I prefer not significantly modifying
>> existing IO transforms to suite the SMB use-case. If there are existing
>> pieces of code that can be easily re-used that is fine, but existing
>> sources/sinks are designed to perform a PCollection -> file transformation
>> and vice versa with (usually) runner determined sharding. Things specific
>> to SMB such as sharding restrictions, writing metadata to a separate file,
>> reading multiple files from the same abstraction, does not sound like
>> features that should be included in our usual file read/write transforms.
>> >>>>
>> >>>>>
>> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered
>> elements across files within a bucket and TBH I'm not even sure where to
>> start.
>> >>>>> >>
>> >>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>> >>>>> >> Iterable<T>. There are existing
>> PTransform<PCollection<ReadableFile>,
>> >>>>> >> PCollection<T>> but such an interface is insufficient to extract
>> >>>>> >> ordered records per shard. It seems the only concrete
>> implementations
>> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but
>> there's no
>> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and
>> >>>>> >> cumbersome to call (given the reflection machinery involved in
>> >>>>> >> invoking DoFns).
>> >>>>> >
>> >>>>> > Seems easiest to just define a new regular Java interface for
>> this.
>> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or
>> something analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void.
>> Depends on how much control over iteration you need.
>> >>>>>
>> >>>>> For this application, one wants to iterate over several files in
>> >>>>> parallel. The downside of a new interface is that it shares almost
>> >>>>> nothing with the "normal" sources (e.g. when features (or
>> >>>>> optimizations) get added to one, they won't get added to the other).
>> >>>>
>> >>>>
>> >>>>>
>> >>>>>
>> >>>>> > And yes, DoFn's including SDF's are not designed to be used as
>> Java interfaces per se. If you need DoFn machinery in this interface (e.g.
>> side inputs), use Contextful - s.apache.org/context-fn.
>> >>>>>
>> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
>> >>>>> to build new DoFns out of others (or, really, use them in any
>> context
>> >>>>> other than as an argument to ParDo).
>> >>>>>
>> >>>>> >> > I'll file separate PRs for core changes needed for discussion.
>> WDYT?
>> >>>>> >>
>> >>>>> >> Sounds good.
>> >>>>
>> >>>>
>> >>>> +1
>> >>>>
>> >>>>>
>> >>>>> >>
>> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>>>> >> >>
>> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <
>> neville.lyh@gmail.com> wrote:
>> >>>>> >> >> >
>> >>>>> >> >> > Forking this thread to discuss action items regarding the
>> change. We can keep technical discussion in the original thread.
>> >>>>> >> >> >
>> >>>>> >> >> > Background: our SMB POC showed promising performance & cost
>> saving improvements and we'd like to adopt it for production soon (by EOY).
>> We want to contribute it to Beam so it's better generalized and maintained.
>> We also want to avoid divergence between our internal version and the PR
>> while it's in progress, specifically any breaking change in the produced
>> SMB data.
>> >>>>> >> >>
>> >>>>> >> >> All good goals.
>> >>>>> >> >>
>> >>>>> >> >> > To achieve that I'd like to propose a few action items.
>> >>>>> >> >> >
>> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key
>> handling, bucket file and metadata format, etc., anything that affect
>> produced SMB data.
>> >>>>> >> >> > 2. Revise the existing PR according to #1
>> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
>> Compression, etc., but keep the existing file level abstraction
>> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark
>> clearly as @experimental
>> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
>> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>> >>>>> >> >> >
>> >>>>> >> >> > #1-4 gives us something usable in the short term, while #1
>> guarantees that production data produced today are usable when #5 lands on
>> master. #4 also gives early adopters a chance to give feedback.
>> >>>>> >> >> > Due to the scope of #5, it might take much longer and a
>> couple of big PRs to achieve, which we can keep iterating on.
>> >>>>> >> >> >
>> >>>>> >> >> > What are your thoughts on this?
>> >>>>> >> >>
>> >>>>> >> >> I would like to see some resolution on the FileIO
>> abstractions before
>> >>>>> >> >> merging into experimental. (We have a FileBasedSink that
>> would mostly
>> >>>>> >> >> already work, so it's a matter of coming up with an analogous
>> Source
>> >>>>> >> >> interface.) Specifically I would not want to merge a set of
>> per file
>> >>>>> >> >> type smb IOs without a path forward to this or the
>> determination that
>> >>>>> >> >> it's not possible/desirable.
>>
>
>
> --
> Cheers,
> Gleb
>

Re: Sort Merge Bucket - Action Items

Posted by Claire McGinty <cl...@gmail.com>.
Hi! Wanted to bump this thread with some updated PRs that reflect these
discussions (updated IOs that parameterize FileIO#Sink, and re-use
ReadableFile). The base pull requests are:

- https://github.com/apache/beam/pull/8823 (BucketMetadata implementation)
- https://github.com/apache/beam/pull/8824/ (FileOperations/IO
implementations)

And the actual PTransform implements build on top of those PRs:
- https://github.com/apache/beam/pull/9250/ (SMB Sink transform)
- https://github.com/apache/beam/pull/9251 (SMB Source transform)

Finally we have some benchmarks/style changes (using AutoValue/Builder
pattern) for those PTransforms:
- https://github.com/apache/beam/pull/9253/ (high-level API classes/style
fixes)
- https://github.com/apache/beam/pull/9279 (benchmarks for SMB sink and
source transform)

I know it's a lot of pull requests at once -- let us know if there's
anything else we can clarify or streamline. Thanks!

- Claire/Neville



On Fri, Jul 26, 2019 at 12:45 PM Kenneth Knowles <ke...@apache.org> wrote:

> There is still considerable value in knowing data sources statically so
> you can do things like fetch sizes and other metadata and adjust pipeline
> shape. I would not expect to delete these, but to implement them on top of
> SDF while still giving them a clear URN and payload so runners can know
> that it is a statically-specified source.
>
> Kenn
>
> On Fri, Jul 26, 2019 at 3:23 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Thu, Jul 25, 2019 at 11:09 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>> >
>> > Hi Gleb,
>> >
>> > Regarding the future of io.Read: ideally things would go as follows
>> > - All runners support SDF at feature parity with Read (mostly this is
>> just the Dataflow runner's liquid sharding and size estimation for bounded
>> sources, and backlog for unbounded sources, but I recall that a couple of
>> other runners also used size estimation)
>> > - Bounded/UnboundedSource APIs are declared "deprecated" - it is
>> forbidden to add any new implementations to SDK, and users shouldn't use
>> them either (note: I believe it's already effectively forbidden to use them
>> for cases where a DoFn/SDF at the current level of support will be
>> sufficient)
>> > - People one by one rewrite existing Bounded/UnboundedSource based
>> PTransforms in the SDK to use SDFs instead
>> > - Read.from() is rewritten to use a wrapper SDF over the given Source,
>> and explicit support for Read is deleted from runners
>> > - In the next major version of Beam - presumably 3.0 - the Read
>> transform itself is deleted
>> >
>> > I don't know what's the current status of SDF/Read feature parity,
>> maybe Luke or Cham can comment. An alternative path is offered in
>> http://s.apache.org/sdf-via-source.
>>
>> Python supports initial splitting for SDF of all sources on portable
>> runners. Dataflow support for batch SDF is undergoing testing, not yet
>> rolled out. Dataflow support for streaming SDF is awaiting portable
>> state/timer support.
>>
>> > On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov <gl...@spotify.com> wrote:
>> >>
>> >> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it
>> going away in favor of SDF, or we are always going to have both?
>> >>
>> >> I was looking into AvroIO.read and AvroIO.readAll, both of them use
>> AvroSource. AvroIO.readAll is using SDF, and it's implemented with
>> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
>> ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it
>> should be enough to have something like (KV<ReadableFile, OffsetRange>,
>> OutputReceiver<T>), as we have discussed in this thread, and that should be
>> fine for SMB as well. It would require duplicating code from AvroSource,
>> but in the end, I don't see it as a problem if AvroSource is going away.
>> >>
>> >> I'm attaching a small diagram I put for myself to better understand
>> the code.
>> >>
>> >> AvroIO.readAll :: PTransform<PBegin, PCollection<T>> ->
>> >>
>> >> FileIO.matchAll :: PTransform<PCollection<String>,
>> PCollection<MatchResult.Metadata>>
>> >> FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>,
>> PCollection<ReadableFile>>
>> >> AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>,
>> PCollection<T>> ->
>> >>
>> >> ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>,
>> PCollection<T>> ->
>> >>
>> >> ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile,
>> OffsetRange>>) (splittable do fn)
>> >>
>> >> Reshuffle.viaRandomKey()
>> >>
>> >> ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile,
>> OffsetRange>, T>) where
>> >>
>> >> createSource :: String -> FileBasedSource<T>
>> >>
>> >> createSource = AvroSource
>> >>
>> >>
>> >> AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin,
>> PCollection<T>> ->
>> >>
>> >> Read.Bounded.from(createSource) where
>> >>
>> >> createSource :: String -> FileBasedSource<T>
>> >>
>> >> createSource = AvroSource
>> >>
>> >>
>> >> Gleb
>> >>
>> >>
>> >> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>>
>> >>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <ke...@apache.org>
>> wrote:
>> >>> >
>> >>> > From the peanut gallery, keeping a separate implementation for SMB
>> seems fine. Dependencies are serious liabilities for both upstream and
>> downstream. It seems like the reuse angle is generating extra work, and
>> potentially making already-complex implementations more complex, instead of
>> helping things.
>> >>>
>> >>> +1
>> >>>
>> >>> To be clear, what I care about is that WriteFiles(X) and
>> >>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
>> >>> TFRecord, ...}. In other words composability of the API (vs. manually
>> >>> filling out the matrix). If WriteFiles and WriteSmbFiles find
>> >>> opportunities for (easy, clean) implementation sharing, that'd be
>> >>> nice, but not the primary goal.
>> >>>
>> >>> (Similarly for reading, though that's seem less obvious. Certainly
>> >>> whatever T is useful for ReadSmb(T) could be useful for a
>> >>> (non-liquid-shading) ReadAll(T) however.)
>> >>>
>> >>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <ne...@gmail.com>
>> wrote:
>> >>> >>
>> >>> >> I spoke too soon. Turns out for unsharded writes, numShards can't
>> be determined until the last finalize transform, which is again different
>> from the current SMB proposal (static number of buckets & shards).
>> >>> >> I'll end up with more code specialized for SMB in order to
>> generalize existing sink code, which I think we all want to avoid.
>> >>> >>
>> >>> >> Seems the only option is duplicating some logic like temp file
>> handling, which is exactly what we did in the original PR.
>> >>> >> I can reuse Compression & Sink<T> for file level writes but that
>> seems about the most I can reuse right now.
>> >>> >>
>> >>> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <ne...@gmail.com>
>> wrote:
>> >>> >>>
>> >>> >>> So I spent one afternoon trying some ideas for reusing the last
>> few transforms WriteFiles.
>> >>> >>>
>> >>> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>,
>> Iterable<UserT>>, FileResult<DestinationT>>
>> >>> >>> => GatherResults<ResultT> extends
>> PTransform<PCollection<ResultT>, PCollection<List<ResultT>>>
>> >>> >>> => FinalizeTempFileBundles extends
>> PTransform<PCollection<List<FileResult<DestinationT>>>,
>> WriteFilesResult<DestinationT>>
>> >>> >>>
>> >>> >>> I replaced FileResult<DestinationT> with KV<DestinationT,
>> ResourceId> so I can use pre-compute SMB destination file names for the
>> transforms.
>> >>> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
>> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
>> private and easy to change/pull out.
>> >>> >>>
>> >>> >>> OTOH they are somewhat coupled with the package private
>> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
>> temp file handing logic lives). Might be hard to decouple either modifying
>> existing code or creating new transforms, unless if we re-write most of
>> FileBasedSink from scratch.
>> >>> >>>
>> >>> >>> Let me know if I'm on the wrong track.
>> >>> >>>
>> >>> >>> WIP Branch
>> https://github.com/spotify/beam/tree/neville/write-files
>> >>> >>>
>> >>> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <
>> chamikara@google.com> wrote:
>> >>> >>>>
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>> >>>>>
>> >>> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <
>> kirpichov@google.com> wrote:
>> >>> >>>>> >
>> >>> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>> >>>>> >>
>> >>> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <
>> neville.lyh@gmail.com> wrote:
>> >>> >>>>> >> >
>> >>> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into
>> it and see what needs to be done.
>> >>> >>>>> >> >
>> >>> >>>>> >> > Eugene pointed out that we shouldn't build on
>> FileBased{Source,Sink}. So for writes I'll probably build on top of
>> WriteFiles.
>> >>> >>>>> >>
>> >>> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>> >>> >>>>> >>
>> >>> >>>>> >>
>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>> >>> >>>>> >
>> >>> >>>>> > Yeah if possible, parameterize FileIO.Sink.
>> >>> >>>>> > I would recommend against building on top of WriteFiles
>> either. FileIO being implemented on top of WriteFiles was supposed to be a
>> temporary measure - the longer-term plan was to rewrite it from scratch
>> (albeit with a similar structure) and throw away WriteFiles.
>> >>> >>>>> > If possible, I would recommend to pursue this path: if there
>> are parts of WriteFiles you want to reuse, I would recommend to implement
>> them as new transforms, not at all tied to FileBasedSink (but ok if tied to
>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
>> of these new transforms, or maybe parts of WriteFiles could be swapped out
>> for them incrementally.
>> >>> >>>>>
>> >>> >>>>> Thanks for the feedback. There's a lot that was done, but
>> looking at
>> >>> >>>>> the code it feels like there's a lot that was not yet done
>> either, and
>> >>> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not
>> finding
>> >>> >>>>> the right docs).
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and
>> for updating source interfaces, but I prefer not significantly modifying
>> existing IO transforms to suite the SMB use-case. If there are existing
>> pieces of code that can be easily re-used that is fine, but existing
>> sources/sinks are designed to perform a PCollection -> file transformation
>> and vice versa with (usually) runner determined sharding. Things specific
>> to SMB such as sharding restrictions, writing metadata to a separate file,
>> reading multiple files from the same abstraction, does not sound like
>> features that should be included in our usual file read/write transforms.
>> >>> >>>>
>> >>> >>>>>
>> >>> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered
>> elements across files within a bucket and TBH I'm not even sure where to
>> start.
>> >>> >>>>> >>
>> >>> >>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>> >>> >>>>> >> Iterable<T>. There are existing
>> PTransform<PCollection<ReadableFile>,
>> >>> >>>>> >> PCollection<T>> but such an interface is insufficient to
>> extract
>> >>> >>>>> >> ordered records per shard. It seems the only concrete
>> implementations
>> >>> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but
>> there's no
>> >>> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and
>> >>> >>>>> >> cumbersome to call (given the reflection machinery involved
>> in
>> >>> >>>>> >> invoking DoFns).
>> >>> >>>>> >
>> >>> >>>>> > Seems easiest to just define a new regular Java interface for
>> this.
>> >>> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or
>> something analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void.
>> Depends on how much control over iteration you need.
>> >>> >>>>>
>> >>> >>>>> For this application, one wants to iterate over several files in
>> >>> >>>>> parallel. The downside of a new interface is that it shares
>> almost
>> >>> >>>>> nothing with the "normal" sources (e.g. when features (or
>> >>> >>>>> optimizations) get added to one, they won't get added to the
>> other).
>> >>> >>>>
>> >>> >>>>
>> >>> >>>>>
>> >>> >>>>>
>> >>> >>>>> > And yes, DoFn's including SDF's are not designed to be used
>> as Java interfaces per se. If you need DoFn machinery in this interface
>> (e.g. side inputs), use Contextful - s.apache.org/context-fn.
>> >>> >>>>>
>> >>> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard
>> it is
>> >>> >>>>> to build new DoFns out of others (or, really, use them in any
>> context
>> >>> >>>>> other than as an argument to ParDo).
>> >>> >>>>>
>> >>> >>>>> >> > I'll file separate PRs for core changes needed for
>> discussion. WDYT?
>> >>> >>>>> >>
>> >>> >>>>> >> Sounds good.
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> +1
>> >>> >>>>
>> >>> >>>>>
>> >>> >>>>> >>
>> >>> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>> >>>>> >> >>
>> >>> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <
>> neville.lyh@gmail.com> wrote:
>> >>> >>>>> >> >> >
>> >>> >>>>> >> >> > Forking this thread to discuss action items regarding
>> the change. We can keep technical discussion in the original thread.
>> >>> >>>>> >> >> >
>> >>> >>>>> >> >> > Background: our SMB POC showed promising performance &
>> cost saving improvements and we'd like to adopt it for production soon (by
>> EOY). We want to contribute it to Beam so it's better generalized and
>> maintained. We also want to avoid divergence between our internal version
>> and the PR while it's in progress, specifically any breaking change in the
>> produced SMB data.
>> >>> >>>>> >> >>
>> >>> >>>>> >> >> All good goals.
>> >>> >>>>> >> >>
>> >>> >>>>> >> >> > To achieve that I'd like to propose a few action items.
>> >>> >>>>> >> >> >
>> >>> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy,
>> key handling, bucket file and metadata format, etc., anything that affect
>> produced SMB data.
>> >>> >>>>> >> >> > 2. Revise the existing PR according to #1
>> >>> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing
>> FileIO.Sink, Compression, etc., but keep the existing file level abstraction
>> >>> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark
>> clearly as @experimental
>> >>> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g.
>> ShardingFn, GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>> >>> >>>>> >> >> >
>> >>> >>>>> >> >> > #1-4 gives us something usable in the short term, while
>> #1 guarantees that production data produced today are usable when #5 lands
>> on master. #4 also gives early adopters a chance to give feedback.
>> >>> >>>>> >> >> > Due to the scope of #5, it might take much longer and a
>> couple of big PRs to achieve, which we can keep iterating on.
>> >>> >>>>> >> >> >
>> >>> >>>>> >> >> > What are your thoughts on this?
>> >>> >>>>> >> >>
>> >>> >>>>> >> >> I would like to see some resolution on the FileIO
>> abstractions before
>> >>> >>>>> >> >> merging into experimental. (We have a FileBasedSink that
>> would mostly
>> >>> >>>>> >> >> already work, so it's a matter of coming up with an
>> analogous Source
>> >>> >>>>> >> >> interface.) Specifically I would not want to merge a set
>> of per file
>> >>> >>>>> >> >> type smb IOs without a path forward to this or the
>> determination that
>> >>> >>>>> >> >> it's not possible/desirable.
>> >>
>> >>
>> >>
>> >> --
>> >> Cheers,
>> >> Gleb
>>
>

Re: Sort Merge Bucket - Action Items

Posted by Kenneth Knowles <ke...@apache.org>.
There is still considerable value in knowing data sources statically so you
can do things like fetch sizes and other metadata and adjust pipeline
shape. I would not expect to delete these, but to implement them on top of
SDF while still giving them a clear URN and payload so runners can know
that it is a statically-specified source.

Kenn

On Fri, Jul 26, 2019 at 3:23 AM Robert Bradshaw <ro...@google.com> wrote:

> On Thu, Jul 25, 2019 at 11:09 PM Eugene Kirpichov <ki...@google.com>
> wrote:
> >
> > Hi Gleb,
> >
> > Regarding the future of io.Read: ideally things would go as follows
> > - All runners support SDF at feature parity with Read (mostly this is
> just the Dataflow runner's liquid sharding and size estimation for bounded
> sources, and backlog for unbounded sources, but I recall that a couple of
> other runners also used size estimation)
> > - Bounded/UnboundedSource APIs are declared "deprecated" - it is
> forbidden to add any new implementations to SDK, and users shouldn't use
> them either (note: I believe it's already effectively forbidden to use them
> for cases where a DoFn/SDF at the current level of support will be
> sufficient)
> > - People one by one rewrite existing Bounded/UnboundedSource based
> PTransforms in the SDK to use SDFs instead
> > - Read.from() is rewritten to use a wrapper SDF over the given Source,
> and explicit support for Read is deleted from runners
> > - In the next major version of Beam - presumably 3.0 - the Read
> transform itself is deleted
> >
> > I don't know what's the current status of SDF/Read feature parity, maybe
> Luke or Cham can comment. An alternative path is offered in
> http://s.apache.org/sdf-via-source.
>
> Python supports initial splitting for SDF of all sources on portable
> runners. Dataflow support for batch SDF is undergoing testing, not yet
> rolled out. Dataflow support for streaming SDF is awaiting portable
> state/timer support.
>
> > On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov <gl...@spotify.com> wrote:
> >>
> >> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going
> away in favor of SDF, or we are always going to have both?
> >>
> >> I was looking into AvroIO.read and AvroIO.readAll, both of them use
> AvroSource. AvroIO.readAll is using SDF, and it's implemented with
> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
> ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it
> should be enough to have something like (KV<ReadableFile, OffsetRange>,
> OutputReceiver<T>), as we have discussed in this thread, and that should be
> fine for SMB as well. It would require duplicating code from AvroSource,
> but in the end, I don't see it as a problem if AvroSource is going away.
> >>
> >> I'm attaching a small diagram I put for myself to better understand the
> code.
> >>
> >> AvroIO.readAll :: PTransform<PBegin, PCollection<T>> ->
> >>
> >> FileIO.matchAll :: PTransform<PCollection<String>,
> PCollection<MatchResult.Metadata>>
> >> FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>,
> PCollection<ReadableFile>>
> >> AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>,
> PCollection<T>> ->
> >>
> >> ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>,
> PCollection<T>> ->
> >>
> >> ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile,
> OffsetRange>>) (splittable do fn)
> >>
> >> Reshuffle.viaRandomKey()
> >>
> >> ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile,
> OffsetRange>, T>) where
> >>
> >> createSource :: String -> FileBasedSource<T>
> >>
> >> createSource = AvroSource
> >>
> >>
> >> AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin,
> PCollection<T>> ->
> >>
> >> Read.Bounded.from(createSource) where
> >>
> >> createSource :: String -> FileBasedSource<T>
> >>
> >> createSource = AvroSource
> >>
> >>
> >> Gleb
> >>
> >>
> >> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>>
> >>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <ke...@apache.org>
> wrote:
> >>> >
> >>> > From the peanut gallery, keeping a separate implementation for SMB
> seems fine. Dependencies are serious liabilities for both upstream and
> downstream. It seems like the reuse angle is generating extra work, and
> potentially making already-complex implementations more complex, instead of
> helping things.
> >>>
> >>> +1
> >>>
> >>> To be clear, what I care about is that WriteFiles(X) and
> >>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
> >>> TFRecord, ...}. In other words composability of the API (vs. manually
> >>> filling out the matrix). If WriteFiles and WriteSmbFiles find
> >>> opportunities for (easy, clean) implementation sharing, that'd be
> >>> nice, but not the primary goal.
> >>>
> >>> (Similarly for reading, though that's seem less obvious. Certainly
> >>> whatever T is useful for ReadSmb(T) could be useful for a
> >>> (non-liquid-shading) ReadAll(T) however.)
> >>>
> >>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <ne...@gmail.com>
> wrote:
> >>> >>
> >>> >> I spoke too soon. Turns out for unsharded writes, numShards can't
> be determined until the last finalize transform, which is again different
> from the current SMB proposal (static number of buckets & shards).
> >>> >> I'll end up with more code specialized for SMB in order to
> generalize existing sink code, which I think we all want to avoid.
> >>> >>
> >>> >> Seems the only option is duplicating some logic like temp file
> handling, which is exactly what we did in the original PR.
> >>> >> I can reuse Compression & Sink<T> for file level writes but that
> seems about the most I can reuse right now.
> >>> >>
> >>> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <ne...@gmail.com>
> wrote:
> >>> >>>
> >>> >>> So I spent one afternoon trying some ideas for reusing the last
> few transforms WriteFiles.
> >>> >>>
> >>> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>,
> Iterable<UserT>>, FileResult<DestinationT>>
> >>> >>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>,
> PCollection<List<ResultT>>>
> >>> >>> => FinalizeTempFileBundles extends
> PTransform<PCollection<List<FileResult<DestinationT>>>,
> WriteFilesResult<DestinationT>>
> >>> >>>
> >>> >>> I replaced FileResult<DestinationT> with KV<DestinationT,
> ResourceId> so I can use pre-compute SMB destination file names for the
> transforms.
> >>> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
> private and easy to change/pull out.
> >>> >>>
> >>> >>> OTOH they are somewhat coupled with the package private
> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
> temp file handing logic lives). Might be hard to decouple either modifying
> existing code or creating new transforms, unless if we re-write most of
> FileBasedSink from scratch.
> >>> >>>
> >>> >>> Let me know if I'm on the wrong track.
> >>> >>>
> >>> >>> WIP Branch
> https://github.com/spotify/beam/tree/neville/write-files
> >>> >>>
> >>> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <
> chamikara@google.com> wrote:
> >>> >>>>
> >>> >>>>
> >>> >>>>
> >>> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <
> robertwb@google.com> wrote:
> >>> >>>>>
> >>> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <
> kirpichov@google.com> wrote:
> >>> >>>>> >
> >>> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <
> robertwb@google.com> wrote:
> >>> >>>>> >>
> >>> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <
> neville.lyh@gmail.com> wrote:
> >>> >>>>> >> >
> >>> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into
> it and see what needs to be done.
> >>> >>>>> >> >
> >>> >>>>> >> > Eugene pointed out that we shouldn't build on
> FileBased{Source,Sink}. So for writes I'll probably build on top of
> WriteFiles.
> >>> >>>>> >>
> >>> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
> >>> >>>>> >>
> >>> >>>>> >>
> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
> >>> >>>>> >
> >>> >>>>> > Yeah if possible, parameterize FileIO.Sink.
> >>> >>>>> > I would recommend against building on top of WriteFiles
> either. FileIO being implemented on top of WriteFiles was supposed to be a
> temporary measure - the longer-term plan was to rewrite it from scratch
> (albeit with a similar structure) and throw away WriteFiles.
> >>> >>>>> > If possible, I would recommend to pursue this path: if there
> are parts of WriteFiles you want to reuse, I would recommend to implement
> them as new transforms, not at all tied to FileBasedSink (but ok if tied to
> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
> of these new transforms, or maybe parts of WriteFiles could be swapped out
> for them incrementally.
> >>> >>>>>
> >>> >>>>> Thanks for the feedback. There's a lot that was done, but
> looking at
> >>> >>>>> the code it feels like there's a lot that was not yet done
> either, and
> >>> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not
> finding
> >>> >>>>> the right docs).
> >>> >>>>
> >>> >>>>
> >>> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and
> for updating source interfaces, but I prefer not significantly modifying
> existing IO transforms to suite the SMB use-case. If there are existing
> pieces of code that can be easily re-used that is fine, but existing
> sources/sinks are designed to perform a PCollection -> file transformation
> and vice versa with (usually) runner determined sharding. Things specific
> to SMB such as sharding restrictions, writing metadata to a separate file,
> reading multiple files from the same abstraction, does not sound like
> features that should be included in our usual file read/write transforms.
> >>> >>>>
> >>> >>>>>
> >>> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered
> elements across files within a bucket and TBH I'm not even sure where to
> start.
> >>> >>>>> >>
> >>> >>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
> >>> >>>>> >> Iterable<T>. There are existing
> PTransform<PCollection<ReadableFile>,
> >>> >>>>> >> PCollection<T>> but such an interface is insufficient to
> extract
> >>> >>>>> >> ordered records per shard. It seems the only concrete
> implementations
> >>> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but
> there's no
> >>> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and
> >>> >>>>> >> cumbersome to call (given the reflection machinery involved in
> >>> >>>>> >> invoking DoFns).
> >>> >>>>> >
> >>> >>>>> > Seems easiest to just define a new regular Java interface for
> this.
> >>> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or
> something analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void.
> Depends on how much control over iteration you need.
> >>> >>>>>
> >>> >>>>> For this application, one wants to iterate over several files in
> >>> >>>>> parallel. The downside of a new interface is that it shares
> almost
> >>> >>>>> nothing with the "normal" sources (e.g. when features (or
> >>> >>>>> optimizations) get added to one, they won't get added to the
> other).
> >>> >>>>
> >>> >>>>
> >>> >>>>>
> >>> >>>>>
> >>> >>>>> > And yes, DoFn's including SDF's are not designed to be used as
> Java interfaces per se. If you need DoFn machinery in this interface (e.g.
> side inputs), use Contextful - s.apache.org/context-fn.
> >>> >>>>>
> >>> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard
> it is
> >>> >>>>> to build new DoFns out of others (or, really, use them in any
> context
> >>> >>>>> other than as an argument to ParDo).
> >>> >>>>>
> >>> >>>>> >> > I'll file separate PRs for core changes needed for
> discussion. WDYT?
> >>> >>>>> >>
> >>> >>>>> >> Sounds good.
> >>> >>>>
> >>> >>>>
> >>> >>>> +1
> >>> >>>>
> >>> >>>>>
> >>> >>>>> >>
> >>> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
> robertwb@google.com> wrote:
> >>> >>>>> >> >>
> >>> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <
> neville.lyh@gmail.com> wrote:
> >>> >>>>> >> >> >
> >>> >>>>> >> >> > Forking this thread to discuss action items regarding
> the change. We can keep technical discussion in the original thread.
> >>> >>>>> >> >> >
> >>> >>>>> >> >> > Background: our SMB POC showed promising performance &
> cost saving improvements and we'd like to adopt it for production soon (by
> EOY). We want to contribute it to Beam so it's better generalized and
> maintained. We also want to avoid divergence between our internal version
> and the PR while it's in progress, specifically any breaking change in the
> produced SMB data.
> >>> >>>>> >> >>
> >>> >>>>> >> >> All good goals.
> >>> >>>>> >> >>
> >>> >>>>> >> >> > To achieve that I'd like to propose a few action items.
> >>> >>>>> >> >> >
> >>> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy,
> key handling, bucket file and metadata format, etc., anything that affect
> produced SMB data.
> >>> >>>>> >> >> > 2. Revise the existing PR according to #1
> >>> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing
> FileIO.Sink, Compression, etc., but keep the existing file level abstraction
> >>> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark
> clearly as @experimental
> >>> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g.
> ShardingFn, GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
> >>> >>>>> >> >> >
> >>> >>>>> >> >> > #1-4 gives us something usable in the short term, while
> #1 guarantees that production data produced today are usable when #5 lands
> on master. #4 also gives early adopters a chance to give feedback.
> >>> >>>>> >> >> > Due to the scope of #5, it might take much longer and a
> couple of big PRs to achieve, which we can keep iterating on.
> >>> >>>>> >> >> >
> >>> >>>>> >> >> > What are your thoughts on this?
> >>> >>>>> >> >>
> >>> >>>>> >> >> I would like to see some resolution on the FileIO
> abstractions before
> >>> >>>>> >> >> merging into experimental. (We have a FileBasedSink that
> would mostly
> >>> >>>>> >> >> already work, so it's a matter of coming up with an
> analogous Source
> >>> >>>>> >> >> interface.) Specifically I would not want to merge a set
> of per file
> >>> >>>>> >> >> type smb IOs without a path forward to this or the
> determination that
> >>> >>>>> >> >> it's not possible/desirable.
> >>
> >>
> >>
> >> --
> >> Cheers,
> >> Gleb
>

Re: Sort Merge Bucket - Action Items

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Jul 25, 2019 at 11:09 PM Eugene Kirpichov <ki...@google.com> wrote:
>
> Hi Gleb,
>
> Regarding the future of io.Read: ideally things would go as follows
> - All runners support SDF at feature parity with Read (mostly this is just the Dataflow runner's liquid sharding and size estimation for bounded sources, and backlog for unbounded sources, but I recall that a couple of other runners also used size estimation)
> - Bounded/UnboundedSource APIs are declared "deprecated" - it is forbidden to add any new implementations to SDK, and users shouldn't use them either (note: I believe it's already effectively forbidden to use them for cases where a DoFn/SDF at the current level of support will be sufficient)
> - People one by one rewrite existing Bounded/UnboundedSource based PTransforms in the SDK to use SDFs instead
> - Read.from() is rewritten to use a wrapper SDF over the given Source, and explicit support for Read is deleted from runners
> - In the next major version of Beam - presumably 3.0 - the Read transform itself is deleted
>
> I don't know what's the current status of SDF/Read feature parity, maybe Luke or Cham can comment. An alternative path is offered in http://s.apache.org/sdf-via-source.

Python supports initial splitting for SDF of all sources on portable
runners. Dataflow support for batch SDF is undergoing testing, not yet
rolled out. Dataflow support for streaming SDF is awaiting portable
state/timer support.

> On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov <gl...@spotify.com> wrote:
>>
>> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going away in favor of SDF, or we are always going to have both?
>>
>> I was looking into AvroIO.read and AvroIO.readAll, both of them use AvroSource. AvroIO.readAll is using SDF, and it's implemented with ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it should be enough to have something like (KV<ReadableFile, OffsetRange>, OutputReceiver<T>), as we have discussed in this thread, and that should be fine for SMB as well. It would require duplicating code from AvroSource, but in the end, I don't see it as a problem if AvroSource is going away.
>>
>> I'm attaching a small diagram I put for myself to better understand the code.
>>
>> AvroIO.readAll :: PTransform<PBegin, PCollection<T>> ->
>>
>> FileIO.matchAll :: PTransform<PCollection<String>, PCollection<MatchResult.Metadata>>
>> FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>, PCollection<ReadableFile>>
>> AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> ->
>>
>> ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>, PCollection<T>> ->
>>
>> ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile, OffsetRange>>) (splittable do fn)
>>
>> Reshuffle.viaRandomKey()
>>
>> ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile, OffsetRange>, T>) where
>>
>> createSource :: String -> FileBasedSource<T>
>>
>> createSource = AvroSource
>>
>>
>> AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin, PCollection<T>> ->
>>
>> Read.Bounded.from(createSource) where
>>
>> createSource :: String -> FileBasedSource<T>
>>
>> createSource = AvroSource
>>
>>
>> Gleb
>>
>>
>> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <ke...@apache.org> wrote:
>>> >
>>> > From the peanut gallery, keeping a separate implementation for SMB seems fine. Dependencies are serious liabilities for both upstream and downstream. It seems like the reuse angle is generating extra work, and potentially making already-complex implementations more complex, instead of helping things.
>>>
>>> +1
>>>
>>> To be clear, what I care about is that WriteFiles(X) and
>>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
>>> TFRecord, ...}. In other words composability of the API (vs. manually
>>> filling out the matrix). If WriteFiles and WriteSmbFiles find
>>> opportunities for (easy, clean) implementation sharing, that'd be
>>> nice, but not the primary goal.
>>>
>>> (Similarly for reading, though that's seem less obvious. Certainly
>>> whatever T is useful for ReadSmb(T) could be useful for a
>>> (non-liquid-shading) ReadAll(T) however.)
>>>
>>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <ne...@gmail.com> wrote:
>>> >>
>>> >> I spoke too soon. Turns out for unsharded writes, numShards can't be determined until the last finalize transform, which is again different from the current SMB proposal (static number of buckets & shards).
>>> >> I'll end up with more code specialized for SMB in order to generalize existing sink code, which I think we all want to avoid.
>>> >>
>>> >> Seems the only option is duplicating some logic like temp file handling, which is exactly what we did in the original PR.
>>> >> I can reuse Compression & Sink<T> for file level writes but that seems about the most I can reuse right now.
>>> >>
>>> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <ne...@gmail.com> wrote:
>>> >>>
>>> >>> So I spent one afternoon trying some ideas for reusing the last few transforms WriteFiles.
>>> >>>
>>> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>>
>>> >>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>, PCollection<List<ResultT>>>
>>> >>> => FinalizeTempFileBundles extends PTransform<PCollection<List<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>>
>>> >>>
>>> >>> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId> so I can use pre-compute SMB destination file names for the transforms.
>>> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are private and easy to change/pull out.
>>> >>>
>>> >>> OTOH they are somewhat coupled with the package private {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of temp file handing logic lives). Might be hard to decouple either modifying existing code or creating new transforms, unless if we re-write most of FileBasedSink from scratch.
>>> >>>
>>> >>> Let me know if I'm on the wrong track.
>>> >>>
>>> >>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>>> >>>
>>> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <ch...@google.com> wrote:
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <ro...@google.com> wrote:
>>> >>>>>
>>> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <ki...@google.com> wrote:
>>> >>>>> >
>>> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <ro...@google.com> wrote:
>>> >>>>> >>
>>> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <ne...@gmail.com> wrote:
>>> >>>>> >> >
>>> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and see what needs to be done.
>>> >>>>> >> >
>>> >>>>> >> > Eugene pointed out that we shouldn't build on FileBased{Source,Sink}. So for writes I'll probably build on top of WriteFiles.
>>> >>>>> >>
>>> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>>> >>>>> >>
>>> >>>>> >> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>>> >>>>> >
>>> >>>>> > Yeah if possible, parameterize FileIO.Sink.
>>> >>>>> > I would recommend against building on top of WriteFiles either. FileIO being implemented on top of WriteFiles was supposed to be a temporary measure - the longer-term plan was to rewrite it from scratch (albeit with a similar structure) and throw away WriteFiles.
>>> >>>>> > If possible, I would recommend to pursue this path: if there are parts of WriteFiles you want to reuse, I would recommend to implement them as new transforms, not at all tied to FileBasedSink (but ok if tied to FileIO.Sink), with the goal in mind that FileIO could be rewritten on top of these new transforms, or maybe parts of WriteFiles could be swapped out for them incrementally.
>>> >>>>>
>>> >>>>> Thanks for the feedback. There's a lot that was done, but looking at
>>> >>>>> the code it feels like there's a lot that was not yet done either, and
>>> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not finding
>>> >>>>> the right docs).
>>> >>>>
>>> >>>>
>>> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and for updating source interfaces, but I prefer not significantly modifying existing IO transforms to suite the SMB use-case. If there are existing pieces of code that can be easily re-used that is fine, but existing sources/sinks are designed to perform a PCollection -> file transformation and vice versa with (usually) runner determined sharding. Things specific to SMB such as sharding restrictions, writing metadata to a separate file, reading multiple files from the same abstraction, does not sound like features that should be included in our usual file read/write transforms.
>>> >>>>
>>> >>>>>
>>> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered elements across files within a bucket and TBH I'm not even sure where to start.
>>> >>>>> >>
>>> >>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>>> >>>>> >> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>,
>>> >>>>> >> PCollection<T>> but such an interface is insufficient to extract
>>> >>>>> >> ordered records per shard. It seems the only concrete implementations
>>> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but there's no
>>> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and
>>> >>>>> >> cumbersome to call (given the reflection machinery involved in
>>> >>>>> >> invoking DoFns).
>>> >>>>> >
>>> >>>>> > Seems easiest to just define a new regular Java interface for this.
>>> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how much control over iteration you need.
>>> >>>>>
>>> >>>>> For this application, one wants to iterate over several files in
>>> >>>>> parallel. The downside of a new interface is that it shares almost
>>> >>>>> nothing with the "normal" sources (e.g. when features (or
>>> >>>>> optimizations) get added to one, they won't get added to the other).
>>> >>>>
>>> >>>>
>>> >>>>>
>>> >>>>>
>>> >>>>> > And yes, DoFn's including SDF's are not designed to be used as Java interfaces per se. If you need DoFn machinery in this interface (e.g. side inputs), use Contextful - s.apache.org/context-fn.
>>> >>>>>
>>> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
>>> >>>>> to build new DoFns out of others (or, really, use them in any context
>>> >>>>> other than as an argument to ParDo).
>>> >>>>>
>>> >>>>> >> > I'll file separate PRs for core changes needed for discussion. WDYT?
>>> >>>>> >>
>>> >>>>> >> Sounds good.
>>> >>>>
>>> >>>>
>>> >>>> +1
>>> >>>>
>>> >>>>>
>>> >>>>> >>
>>> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <ro...@google.com> wrote:
>>> >>>>> >> >>
>>> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <ne...@gmail.com> wrote:
>>> >>>>> >> >> >
>>> >>>>> >> >> > Forking this thread to discuss action items regarding the change. We can keep technical discussion in the original thread.
>>> >>>>> >> >> >
>>> >>>>> >> >> > Background: our SMB POC showed promising performance & cost saving improvements and we'd like to adopt it for production soon (by EOY). We want to contribute it to Beam so it's better generalized and maintained. We also want to avoid divergence between our internal version and the PR while it's in progress, specifically any breaking change in the produced SMB data.
>>> >>>>> >> >>
>>> >>>>> >> >> All good goals.
>>> >>>>> >> >>
>>> >>>>> >> >> > To achieve that I'd like to propose a few action items.
>>> >>>>> >> >> >
>>> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key handling, bucket file and metadata format, etc., anything that affect produced SMB data.
>>> >>>>> >> >> > 2. Revise the existing PR according to #1
>>> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, Compression, etc., but keep the existing file level abstraction
>>> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark clearly as @experimental
>>> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn, GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>>> >>>>> >> >> >
>>> >>>>> >> >> > #1-4 gives us something usable in the short term, while #1 guarantees that production data produced today are usable when #5 lands on master. #4 also gives early adopters a chance to give feedback.
>>> >>>>> >> >> > Due to the scope of #5, it might take much longer and a couple of big PRs to achieve, which we can keep iterating on.
>>> >>>>> >> >> >
>>> >>>>> >> >> > What are your thoughts on this?
>>> >>>>> >> >>
>>> >>>>> >> >> I would like to see some resolution on the FileIO abstractions before
>>> >>>>> >> >> merging into experimental. (We have a FileBasedSink that would mostly
>>> >>>>> >> >> already work, so it's a matter of coming up with an analogous Source
>>> >>>>> >> >> interface.) Specifically I would not want to merge a set of per file
>>> >>>>> >> >> type smb IOs without a path forward to this or the determination that
>>> >>>>> >> >> it's not possible/desirable.
>>
>>
>>
>> --
>> Cheers,
>> Gleb

Re: Sort Merge Bucket - Action Items

Posted by Eugene Kirpichov <ki...@google.com>.
Hi Gleb,

Regarding the future of io.Read: ideally things would go as follows
- All runners support SDF at feature parity with Read (mostly this is just
the Dataflow runner's liquid sharding and size estimation for bounded
sources, and backlog for unbounded sources, but I recall that a couple of
other runners also used size estimation)
- Bounded/UnboundedSource APIs are declared "deprecated" - it is forbidden
to add any new implementations to SDK, and users shouldn't use them either
(note: I believe it's already effectively forbidden to use them for cases
where a DoFn/SDF at the current level of support will be sufficient)
- People one by one rewrite existing Bounded/UnboundedSource based
PTransforms in the SDK to use SDFs instead
- Read.from() is rewritten to use a wrapper SDF over the given Source, and
explicit support for Read is deleted from runners
- In the next major version of Beam - presumably 3.0 - the Read transform
itself is deleted

I don't know what's the current status of SDF/Read feature parity, maybe
Luke or Cham can comment. An alternative path is offered in
http://s.apache.org/sdf-via-source.


On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov <gl...@spotify.com> wrote:

> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going
> away in favor of SDF, or we are always going to have both?
>
> I was looking into AvroIO.read and AvroIO.readAll, both of them
> use AvroSource. AvroIO.readAll is using SDF, and it's implemented with
> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
> ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it
> should be enough to have something like (KV<ReadableFile, OffsetRange>,
> OutputReceiver<T>), as we have discussed in this thread, and that should be
> fine for SMB as well. It would require duplicating code from AvroSource,
> but in the end, I don't see it as a problem if AvroSource is going away.
>
> I'm attaching a small diagram I put for myself to better understand the
> code.
>
> AvroIO.readAll :: PTransform<PBegin, PCollection<T>> ->
>
> FileIO.matchAll :: PTransform<PCollection<String>,
> PCollection<MatchResult.Metadata>>
> FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>,
> PCollection<ReadableFile>>
> AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>,
> PCollection<T>> ->
>
> ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>,
> PCollection<T>> ->
>
> ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile,
> OffsetRange>>) (splittable do fn)
>
> Reshuffle.viaRandomKey()
>
> ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile,
> OffsetRange>, T>) where
>
> createSource :: String -> FileBasedSource<T>
>
> createSource = AvroSource
>
>
> AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin,
> PCollection<T>> ->
>
> Read.Bounded.from(createSource) where
>
> createSource :: String -> FileBasedSource<T>
>
> createSource = AvroSource
>
>
> Gleb
>
>
> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <ke...@apache.org> wrote:
>> >
>> > From the peanut gallery, keeping a separate implementation for SMB
>> seems fine. Dependencies are serious liabilities for both upstream and
>> downstream. It seems like the reuse angle is generating extra work, and
>> potentially making already-complex implementations more complex, instead of
>> helping things.
>>
>> +1
>>
>> To be clear, what I care about is that WriteFiles(X) and
>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
>> TFRecord, ...}. In other words composability of the API (vs. manually
>> filling out the matrix). If WriteFiles and WriteSmbFiles find
>> opportunities for (easy, clean) implementation sharing, that'd be
>> nice, but not the primary goal.
>>
>> (Similarly for reading, though that's seem less obvious. Certainly
>> whatever T is useful for ReadSmb(T) could be useful for a
>> (non-liquid-shading) ReadAll(T) however.)
>>
>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <ne...@gmail.com>
>> wrote:
>> >>
>> >> I spoke too soon. Turns out for unsharded writes, numShards can't be
>> determined until the last finalize transform, which is again different from
>> the current SMB proposal (static number of buckets & shards).
>> >> I'll end up with more code specialized for SMB in order to generalize
>> existing sink code, which I think we all want to avoid.
>> >>
>> >> Seems the only option is duplicating some logic like temp file
>> handling, which is exactly what we did in the original PR.
>> >> I can reuse Compression & Sink<T> for file level writes but that seems
>> about the most I can reuse right now.
>> >>
>> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <ne...@gmail.com>
>> wrote:
>> >>>
>> >>> So I spent one afternoon trying some ideas for reusing the last few
>> transforms WriteFiles.
>> >>>
>> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>,
>> Iterable<UserT>>, FileResult<DestinationT>>
>> >>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>,
>> PCollection<List<ResultT>>>
>> >>> => FinalizeTempFileBundles extends
>> PTransform<PCollection<List<FileResult<DestinationT>>>,
>> WriteFilesResult<DestinationT>>
>> >>>
>> >>> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId>
>> so I can use pre-compute SMB destination file names for the transforms.
>> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
>> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
>> private and easy to change/pull out.
>> >>>
>> >>> OTOH they are somewhat coupled with the package private
>> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
>> temp file handing logic lives). Might be hard to decouple either modifying
>> existing code or creating new transforms, unless if we re-write most of
>> FileBasedSink from scratch.
>> >>>
>> >>> Let me know if I'm on the wrong track.
>> >>>
>> >>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>> >>>
>> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <
>> chamikara@google.com> wrote:
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>>>>
>> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <
>> kirpichov@google.com> wrote:
>> >>>>> >
>> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>>>> >>
>> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <
>> neville.lyh@gmail.com> wrote:
>> >>>>> >> >
>> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it
>> and see what needs to be done.
>> >>>>> >> >
>> >>>>> >> > Eugene pointed out that we shouldn't build on
>> FileBased{Source,Sink}. So for writes I'll probably build on top of
>> WriteFiles.
>> >>>>> >>
>> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>> >>>>> >>
>> >>>>> >>
>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>> >>>>> >
>> >>>>> > Yeah if possible, parameterize FileIO.Sink.
>> >>>>> > I would recommend against building on top of WriteFiles either.
>> FileIO being implemented on top of WriteFiles was supposed to be a
>> temporary measure - the longer-term plan was to rewrite it from scratch
>> (albeit with a similar structure) and throw away WriteFiles.
>> >>>>> > If possible, I would recommend to pursue this path: if there are
>> parts of WriteFiles you want to reuse, I would recommend to implement them
>> as new transforms, not at all tied to FileBasedSink (but ok if tied to
>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
>> of these new transforms, or maybe parts of WriteFiles could be swapped out
>> for them incrementally.
>> >>>>>
>> >>>>> Thanks for the feedback. There's a lot that was done, but looking at
>> >>>>> the code it feels like there's a lot that was not yet done either,
>> and
>> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not
>> finding
>> >>>>> the right docs).
>> >>>>
>> >>>>
>> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and for
>> updating source interfaces, but I prefer not significantly modifying
>> existing IO transforms to suite the SMB use-case. If there are existing
>> pieces of code that can be easily re-used that is fine, but existing
>> sources/sinks are designed to perform a PCollection -> file transformation
>> and vice versa with (usually) runner determined sharding. Things specific
>> to SMB such as sharding restrictions, writing metadata to a separate file,
>> reading multiple files from the same abstraction, does not sound like
>> features that should be included in our usual file read/write transforms.
>> >>>>
>> >>>>>
>> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered
>> elements across files within a bucket and TBH I'm not even sure where to
>> start.
>> >>>>> >>
>> >>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>> >>>>> >> Iterable<T>. There are existing
>> PTransform<PCollection<ReadableFile>,
>> >>>>> >> PCollection<T>> but such an interface is insufficient to extract
>> >>>>> >> ordered records per shard. It seems the only concrete
>> implementations
>> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but
>> there's no
>> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and
>> >>>>> >> cumbersome to call (given the reflection machinery involved in
>> >>>>> >> invoking DoFns).
>> >>>>> >
>> >>>>> > Seems easiest to just define a new regular Java interface for
>> this.
>> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or
>> something analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void.
>> Depends on how much control over iteration you need.
>> >>>>>
>> >>>>> For this application, one wants to iterate over several files in
>> >>>>> parallel. The downside of a new interface is that it shares almost
>> >>>>> nothing with the "normal" sources (e.g. when features (or
>> >>>>> optimizations) get added to one, they won't get added to the other).
>> >>>>
>> >>>>
>> >>>>>
>> >>>>>
>> >>>>> > And yes, DoFn's including SDF's are not designed to be used as
>> Java interfaces per se. If you need DoFn machinery in this interface (e.g.
>> side inputs), use Contextful - s.apache.org/context-fn.
>> >>>>>
>> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
>> >>>>> to build new DoFns out of others (or, really, use them in any
>> context
>> >>>>> other than as an argument to ParDo).
>> >>>>>
>> >>>>> >> > I'll file separate PRs for core changes needed for discussion.
>> WDYT?
>> >>>>> >>
>> >>>>> >> Sounds good.
>> >>>>
>> >>>>
>> >>>> +1
>> >>>>
>> >>>>>
>> >>>>> >>
>> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>>>> >> >>
>> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <
>> neville.lyh@gmail.com> wrote:
>> >>>>> >> >> >
>> >>>>> >> >> > Forking this thread to discuss action items regarding the
>> change. We can keep technical discussion in the original thread.
>> >>>>> >> >> >
>> >>>>> >> >> > Background: our SMB POC showed promising performance & cost
>> saving improvements and we'd like to adopt it for production soon (by EOY).
>> We want to contribute it to Beam so it's better generalized and maintained.
>> We also want to avoid divergence between our internal version and the PR
>> while it's in progress, specifically any breaking change in the produced
>> SMB data.
>> >>>>> >> >>
>> >>>>> >> >> All good goals.
>> >>>>> >> >>
>> >>>>> >> >> > To achieve that I'd like to propose a few action items.
>> >>>>> >> >> >
>> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key
>> handling, bucket file and metadata format, etc., anything that affect
>> produced SMB data.
>> >>>>> >> >> > 2. Revise the existing PR according to #1
>> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
>> Compression, etc., but keep the existing file level abstraction
>> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark
>> clearly as @experimental
>> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
>> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>> >>>>> >> >> >
>> >>>>> >> >> > #1-4 gives us something usable in the short term, while #1
>> guarantees that production data produced today are usable when #5 lands on
>> master. #4 also gives early adopters a chance to give feedback.
>> >>>>> >> >> > Due to the scope of #5, it might take much longer and a
>> couple of big PRs to achieve, which we can keep iterating on.
>> >>>>> >> >> >
>> >>>>> >> >> > What are your thoughts on this?
>> >>>>> >> >>
>> >>>>> >> >> I would like to see some resolution on the FileIO
>> abstractions before
>> >>>>> >> >> merging into experimental. (We have a FileBasedSink that
>> would mostly
>> >>>>> >> >> already work, so it's a matter of coming up with an analogous
>> Source
>> >>>>> >> >> interface.) Specifically I would not want to merge a set of
>> per file
>> >>>>> >> >> type smb IOs without a path forward to this or the
>> determination that
>> >>>>> >> >> it's not possible/desirable.
>>
>
>
> --
> Cheers,
> Gleb
>

Re: Sort Merge Bucket - Action Items

Posted by Gleb Kanterov <gl...@spotify.com>.
What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going
away in favor of SDF, or we are always going to have both?

I was looking into AvroIO.read and AvroIO.readAll, both of them
use AvroSource. AvroIO.readAll is using SDF, and it's implemented with
ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it
should be enough to have something like (KV<ReadableFile, OffsetRange>,
OutputReceiver<T>), as we have discussed in this thread, and that should be
fine for SMB as well. It would require duplicating code from AvroSource,
but in the end, I don't see it as a problem if AvroSource is going away.

I'm attaching a small diagram I put for myself to better understand the
code.

AvroIO.readAll :: PTransform<PBegin, PCollection<T>> ->

FileIO.matchAll :: PTransform<PCollection<String>,
PCollection<MatchResult.Metadata>>
FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>,
PCollection<ReadableFile>>
AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>,
PCollection<T>> ->

ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>,
PCollection<T>> ->

ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile,
OffsetRange>>) (splittable do fn)

Reshuffle.viaRandomKey()

ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile,
OffsetRange>, T>) where

createSource :: String -> FileBasedSource<T>

createSource = AvroSource


AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin,
PCollection<T>> ->

Read.Bounded.from(createSource) where

createSource :: String -> FileBasedSource<T>

createSource = AvroSource


Gleb


On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <ro...@google.com> wrote:

> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <ke...@apache.org> wrote:
> >
> > From the peanut gallery, keeping a separate implementation for SMB seems
> fine. Dependencies are serious liabilities for both upstream and
> downstream. It seems like the reuse angle is generating extra work, and
> potentially making already-complex implementations more complex, instead of
> helping things.
>
> +1
>
> To be clear, what I care about is that WriteFiles(X) and
> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
> TFRecord, ...}. In other words composability of the API (vs. manually
> filling out the matrix). If WriteFiles and WriteSmbFiles find
> opportunities for (easy, clean) implementation sharing, that'd be
> nice, but not the primary goal.
>
> (Similarly for reading, though that's seem less obvious. Certainly
> whatever T is useful for ReadSmb(T) could be useful for a
> (non-liquid-shading) ReadAll(T) however.)
>
> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <ne...@gmail.com>
> wrote:
> >>
> >> I spoke too soon. Turns out for unsharded writes, numShards can't be
> determined until the last finalize transform, which is again different from
> the current SMB proposal (static number of buckets & shards).
> >> I'll end up with more code specialized for SMB in order to generalize
> existing sink code, which I think we all want to avoid.
> >>
> >> Seems the only option is duplicating some logic like temp file
> handling, which is exactly what we did in the original PR.
> >> I can reuse Compression & Sink<T> for file level writes but that seems
> about the most I can reuse right now.
> >>
> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <ne...@gmail.com>
> wrote:
> >>>
> >>> So I spent one afternoon trying some ideas for reusing the last few
> transforms WriteFiles.
> >>>
> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>,
> Iterable<UserT>>, FileResult<DestinationT>>
> >>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>,
> PCollection<List<ResultT>>>
> >>> => FinalizeTempFileBundles extends
> PTransform<PCollection<List<FileResult<DestinationT>>>,
> WriteFilesResult<DestinationT>>
> >>>
> >>> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId>
> so I can use pre-compute SMB destination file names for the transforms.
> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
> private and easy to change/pull out.
> >>>
> >>> OTOH they are somewhat coupled with the package private
> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
> temp file handing logic lives). Might be hard to decouple either modifying
> existing code or creating new transforms, unless if we re-write most of
> FileBasedSink from scratch.
> >>>
> >>> Let me know if I'm on the wrong track.
> >>>
> >>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
> >>>
> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <
> chamikara@google.com> wrote:
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>>>>
> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <
> kirpichov@google.com> wrote:
> >>>>> >
> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <
> robertwb@google.com> wrote:
> >>>>> >>
> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <ne...@gmail.com>
> wrote:
> >>>>> >> >
> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it
> and see what needs to be done.
> >>>>> >> >
> >>>>> >> > Eugene pointed out that we shouldn't build on
> FileBased{Source,Sink}. So for writes I'll probably build on top of
> WriteFiles.
> >>>>> >>
> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
> >>>>> >>
> >>>>> >>
> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
> >>>>> >
> >>>>> > Yeah if possible, parameterize FileIO.Sink.
> >>>>> > I would recommend against building on top of WriteFiles either.
> FileIO being implemented on top of WriteFiles was supposed to be a
> temporary measure - the longer-term plan was to rewrite it from scratch
> (albeit with a similar structure) and throw away WriteFiles.
> >>>>> > If possible, I would recommend to pursue this path: if there are
> parts of WriteFiles you want to reuse, I would recommend to implement them
> as new transforms, not at all tied to FileBasedSink (but ok if tied to
> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
> of these new transforms, or maybe parts of WriteFiles could be swapped out
> for them incrementally.
> >>>>>
> >>>>> Thanks for the feedback. There's a lot that was done, but looking at
> >>>>> the code it feels like there's a lot that was not yet done either,
> and
> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not
> finding
> >>>>> the right docs).
> >>>>
> >>>>
> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and for
> updating source interfaces, but I prefer not significantly modifying
> existing IO transforms to suite the SMB use-case. If there are existing
> pieces of code that can be easily re-used that is fine, but existing
> sources/sinks are designed to perform a PCollection -> file transformation
> and vice versa with (usually) runner determined sharding. Things specific
> to SMB such as sharding restrictions, writing metadata to a separate file,
> reading multiple files from the same abstraction, does not sound like
> features that should be included in our usual file read/write transforms.
> >>>>
> >>>>>
> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered
> elements across files within a bucket and TBH I'm not even sure where to
> start.
> >>>>> >>
> >>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
> >>>>> >> Iterable<T>. There are existing
> PTransform<PCollection<ReadableFile>,
> >>>>> >> PCollection<T>> but such an interface is insufficient to extract
> >>>>> >> ordered records per shard. It seems the only concrete
> implementations
> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but
> there's no
> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and
> >>>>> >> cumbersome to call (given the reflection machinery involved in
> >>>>> >> invoking DoFns).
> >>>>> >
> >>>>> > Seems easiest to just define a new regular Java interface for this.
> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something
> analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how
> much control over iteration you need.
> >>>>>
> >>>>> For this application, one wants to iterate over several files in
> >>>>> parallel. The downside of a new interface is that it shares almost
> >>>>> nothing with the "normal" sources (e.g. when features (or
> >>>>> optimizations) get added to one, they won't get added to the other).
> >>>>
> >>>>
> >>>>>
> >>>>>
> >>>>> > And yes, DoFn's including SDF's are not designed to be used as
> Java interfaces per se. If you need DoFn machinery in this interface (e.g.
> side inputs), use Contextful - s.apache.org/context-fn.
> >>>>>
> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
> >>>>> to build new DoFns out of others (or, really, use them in any context
> >>>>> other than as an argument to ParDo).
> >>>>>
> >>>>> >> > I'll file separate PRs for core changes needed for discussion.
> WDYT?
> >>>>> >>
> >>>>> >> Sounds good.
> >>>>
> >>>>
> >>>> +1
> >>>>
> >>>>>
> >>>>> >>
> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
> robertwb@google.com> wrote:
> >>>>> >> >>
> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <
> neville.lyh@gmail.com> wrote:
> >>>>> >> >> >
> >>>>> >> >> > Forking this thread to discuss action items regarding the
> change. We can keep technical discussion in the original thread.
> >>>>> >> >> >
> >>>>> >> >> > Background: our SMB POC showed promising performance & cost
> saving improvements and we'd like to adopt it for production soon (by EOY).
> We want to contribute it to Beam so it's better generalized and maintained.
> We also want to avoid divergence between our internal version and the PR
> while it's in progress, specifically any breaking change in the produced
> SMB data.
> >>>>> >> >>
> >>>>> >> >> All good goals.
> >>>>> >> >>
> >>>>> >> >> > To achieve that I'd like to propose a few action items.
> >>>>> >> >> >
> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key
> handling, bucket file and metadata format, etc., anything that affect
> produced SMB data.
> >>>>> >> >> > 2. Revise the existing PR according to #1
> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
> Compression, etc., but keep the existing file level abstraction
> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark
> clearly as @experimental
> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
> >>>>> >> >> >
> >>>>> >> >> > #1-4 gives us something usable in the short term, while #1
> guarantees that production data produced today are usable when #5 lands on
> master. #4 also gives early adopters a chance to give feedback.
> >>>>> >> >> > Due to the scope of #5, it might take much longer and a
> couple of big PRs to achieve, which we can keep iterating on.
> >>>>> >> >> >
> >>>>> >> >> > What are your thoughts on this?
> >>>>> >> >>
> >>>>> >> >> I would like to see some resolution on the FileIO abstractions
> before
> >>>>> >> >> merging into experimental. (We have a FileBasedSink that would
> mostly
> >>>>> >> >> already work, so it's a matter of coming up with an analogous
> Source
> >>>>> >> >> interface.) Specifically I would not want to merge a set of
> per file
> >>>>> >> >> type smb IOs without a path forward to this or the
> determination that
> >>>>> >> >> it's not possible/desirable.
>


-- 
Cheers,
Gleb

Re: Sort Merge Bucket - Action Items

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <ke...@apache.org> wrote:
>
> From the peanut gallery, keeping a separate implementation for SMB seems fine. Dependencies are serious liabilities for both upstream and downstream. It seems like the reuse angle is generating extra work, and potentially making already-complex implementations more complex, instead of helping things.

+1

To be clear, what I care about is that WriteFiles(X) and
WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
TFRecord, ...}. In other words composability of the API (vs. manually
filling out the matrix). If WriteFiles and WriteSmbFiles find
opportunities for (easy, clean) implementation sharing, that'd be
nice, but not the primary goal.

(Similarly for reading, though that's seem less obvious. Certainly
whatever T is useful for ReadSmb(T) could be useful for a
(non-liquid-shading) ReadAll(T) however.)

> On Wed, Jul 24, 2019 at 11:59 AM Neville Li <ne...@gmail.com> wrote:
>>
>> I spoke too soon. Turns out for unsharded writes, numShards can't be determined until the last finalize transform, which is again different from the current SMB proposal (static number of buckets & shards).
>> I'll end up with more code specialized for SMB in order to generalize existing sink code, which I think we all want to avoid.
>>
>> Seems the only option is duplicating some logic like temp file handling, which is exactly what we did in the original PR.
>> I can reuse Compression & Sink<T> for file level writes but that seems about the most I can reuse right now.
>>
>> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <ne...@gmail.com> wrote:
>>>
>>> So I spent one afternoon trying some ideas for reusing the last few transforms WriteFiles.
>>>
>>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>>
>>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>, PCollection<List<ResultT>>>
>>> => FinalizeTempFileBundles extends PTransform<PCollection<List<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>>
>>>
>>> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId> so I can use pre-compute SMB destination file names for the transforms.
>>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are private and easy to change/pull out.
>>>
>>> OTOH they are somewhat coupled with the package private {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of temp file handing logic lives). Might be hard to decouple either modifying existing code or creating new transforms, unless if we re-write most of FileBasedSink from scratch.
>>>
>>> Let me know if I'm on the wrong track.
>>>
>>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>>>
>>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <ch...@google.com> wrote:
>>>>
>>>>
>>>>
>>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>>
>>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <ki...@google.com> wrote:
>>>>> >
>>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>> >>
>>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <ne...@gmail.com> wrote:
>>>>> >> >
>>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and see what needs to be done.
>>>>> >> >
>>>>> >> > Eugene pointed out that we shouldn't build on FileBased{Source,Sink}. So for writes I'll probably build on top of WriteFiles.
>>>>> >>
>>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>>>>> >>
>>>>> >> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>>>>> >
>>>>> > Yeah if possible, parameterize FileIO.Sink.
>>>>> > I would recommend against building on top of WriteFiles either. FileIO being implemented on top of WriteFiles was supposed to be a temporary measure - the longer-term plan was to rewrite it from scratch (albeit with a similar structure) and throw away WriteFiles.
>>>>> > If possible, I would recommend to pursue this path: if there are parts of WriteFiles you want to reuse, I would recommend to implement them as new transforms, not at all tied to FileBasedSink (but ok if tied to FileIO.Sink), with the goal in mind that FileIO could be rewritten on top of these new transforms, or maybe parts of WriteFiles could be swapped out for them incrementally.
>>>>>
>>>>> Thanks for the feedback. There's a lot that was done, but looking at
>>>>> the code it feels like there's a lot that was not yet done either, and
>>>>> the longer-term plan wasn't clear (though perhaps I'm just not finding
>>>>> the right docs).
>>>>
>>>>
>>>> I'm also a bit unfamiliar with original plans for WriteFiles and for updating source interfaces, but I prefer not significantly modifying existing IO transforms to suite the SMB use-case. If there are existing pieces of code that can be easily re-used that is fine, but existing sources/sinks are designed to perform a PCollection -> file transformation and vice versa with (usually) runner determined sharding. Things specific to SMB such as sharding restrictions, writing metadata to a separate file, reading multiple files from the same abstraction, does not sound like features that should be included in our usual file read/write transforms.
>>>>
>>>>>
>>>>> >> > Read might be a bigger change w.r.t. collocating ordered elements across files within a bucket and TBH I'm not even sure where to start.
>>>>> >>
>>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>>>>> >> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>,
>>>>> >> PCollection<T>> but such an interface is insufficient to extract
>>>>> >> ordered records per shard. It seems the only concrete implementations
>>>>> >> are based on FileBasedSource, which we'd like to avoid, but there's no
>>>>> >> alternative. An SDF, if exposed, would likely be overkill and
>>>>> >> cumbersome to call (given the reflection machinery involved in
>>>>> >> invoking DoFns).
>>>>> >
>>>>> > Seems easiest to just define a new regular Java interface for this.
>>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how much control over iteration you need.
>>>>>
>>>>> For this application, one wants to iterate over several files in
>>>>> parallel. The downside of a new interface is that it shares almost
>>>>> nothing with the "normal" sources (e.g. when features (or
>>>>> optimizations) get added to one, they won't get added to the other).
>>>>
>>>>
>>>>>
>>>>>
>>>>> > And yes, DoFn's including SDF's are not designed to be used as Java interfaces per se. If you need DoFn machinery in this interface (e.g. side inputs), use Contextful - s.apache.org/context-fn.
>>>>>
>>>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
>>>>> to build new DoFns out of others (or, really, use them in any context
>>>>> other than as an argument to ParDo).
>>>>>
>>>>> >> > I'll file separate PRs for core changes needed for discussion. WDYT?
>>>>> >>
>>>>> >> Sounds good.
>>>>
>>>>
>>>> +1
>>>>
>>>>>
>>>>> >>
>>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>> >> >>
>>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <ne...@gmail.com> wrote:
>>>>> >> >> >
>>>>> >> >> > Forking this thread to discuss action items regarding the change. We can keep technical discussion in the original thread.
>>>>> >> >> >
>>>>> >> >> > Background: our SMB POC showed promising performance & cost saving improvements and we'd like to adopt it for production soon (by EOY). We want to contribute it to Beam so it's better generalized and maintained. We also want to avoid divergence between our internal version and the PR while it's in progress, specifically any breaking change in the produced SMB data.
>>>>> >> >>
>>>>> >> >> All good goals.
>>>>> >> >>
>>>>> >> >> > To achieve that I'd like to propose a few action items.
>>>>> >> >> >
>>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key handling, bucket file and metadata format, etc., anything that affect produced SMB data.
>>>>> >> >> > 2. Revise the existing PR according to #1
>>>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, Compression, etc., but keep the existing file level abstraction
>>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark clearly as @experimental
>>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn, GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>>>>> >> >> >
>>>>> >> >> > #1-4 gives us something usable in the short term, while #1 guarantees that production data produced today are usable when #5 lands on master. #4 also gives early adopters a chance to give feedback.
>>>>> >> >> > Due to the scope of #5, it might take much longer and a couple of big PRs to achieve, which we can keep iterating on.
>>>>> >> >> >
>>>>> >> >> > What are your thoughts on this?
>>>>> >> >>
>>>>> >> >> I would like to see some resolution on the FileIO abstractions before
>>>>> >> >> merging into experimental. (We have a FileBasedSink that would mostly
>>>>> >> >> already work, so it's a matter of coming up with an analogous Source
>>>>> >> >> interface.) Specifically I would not want to merge a set of per file
>>>>> >> >> type smb IOs without a path forward to this or the determination that
>>>>> >> >> it's not possible/desirable.

Re: Sort Merge Bucket - Action Items

Posted by Kenneth Knowles <ke...@apache.org>.
From the peanut gallery, keeping a separate implementation for SMB seems
fine. Dependencies are serious liabilities for both upstream and
downstream. It seems like the reuse angle is generating extra work, and
potentially making already-complex implementations more complex, instead of
helping things.

Kenn

On Wed, Jul 24, 2019 at 11:59 AM Neville Li <ne...@gmail.com> wrote:

> I spoke too soon. Turns out for unsharded writes, numShards can't be
> determined until the last finalize transform, which is again different from
> the current SMB proposal (static number of buckets & shards).
> I'll end up with more code specialized for SMB in order to generalize
> existing sink code, which I think we all want to avoid.
>
> Seems the only option is duplicating some logic like temp file handling,
> which is exactly what we did in the original PR.
> I can reuse Compression & Sink<T> for file level writes but that seems
> about the most I can reuse right now.
>
> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <ne...@gmail.com> wrote:
>
>> So I spent one afternoon trying some ideas for reusing the last few
>> transforms WriteFiles.
>>
>> WriteShardsIntoTempFilesFn extends DoFn<KV<*ShardedKey<Integer>*,
>> Iterable<UserT>>, *FileResult*<DestinationT>>
>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>,
>> PCollection<List<ResultT>>>
>> => FinalizeTempFileBundles extends PTransform<PCollection<List<
>> *FileResult<DestinationT>*>>, WriteFilesResult<DestinationT>>
>>
>> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId> so
>> I can use pre-compute SMB destination file names for the transforms.
>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
>> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
>> private and easy to change/pull out.
>>
>> OTOH they are somewhat coupled with the package private
>> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
>> temp file handing logic lives). Might be hard to decouple either modifying
>> existing code or creating new transforms, unless if we re-write most of
>> FileBasedSink from scratch.
>>
>> Let me know if I'm on the wrong track.
>>
>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>>
>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>> >
>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>> >>
>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <ne...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and
>>>> see what needs to be done.
>>>> >> >
>>>> >> > Eugene pointed out that we shouldn't build on
>>>> FileBased{Source,Sink}. So for writes I'll probably build on top of
>>>> WriteFiles.
>>>> >>
>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>>>> >>
>>>> >>
>>>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>>>> >
>>>> > Yeah if possible, parameterize FileIO.Sink.
>>>> > I would recommend against building on top of WriteFiles either.
>>>> FileIO being implemented on top of WriteFiles was supposed to be a
>>>> temporary measure - the longer-term plan was to rewrite it from scratch
>>>> (albeit with a similar structure) and throw away WriteFiles.
>>>> > If possible, I would recommend to pursue this path: if there are
>>>> parts of WriteFiles you want to reuse, I would recommend to implement them
>>>> as new transforms, not at all tied to FileBasedSink (but ok if tied to
>>>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
>>>> of these new transforms, or maybe parts of WriteFiles could be swapped out
>>>> for them incrementally.
>>>>
>>>> Thanks for the feedback. There's a lot that was done, but looking at
>>>> the code it feels like there's a lot that was not yet done either, and
>>>> the longer-term plan wasn't clear (though perhaps I'm just not finding
>>>> the right docs).
>>>>
>>>
>>> I'm also a bit unfamiliar with original plans for WriteFiles and for
>>> updating source interfaces, but I prefer not significantly modifying
>>> existing IO transforms to suite the SMB use-case. If there are existing
>>> pieces of code that can be easily re-used that is fine, but existing
>>> sources/sinks are designed to perform a PCollection -> file transformation
>>> and vice versa with (usually) runner determined sharding. Things specific
>>> to SMB such as sharding restrictions, writing metadata to a separate file,
>>> reading multiple files from the same abstraction, does not sound like
>>> features that should be included in our usual file read/write transforms.
>>>
>>>
>>>> >> > Read might be a bigger change w.r.t. collocating ordered elements
>>>> across files within a bucket and TBH I'm not even sure where to start.
>>>> >>
>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>>>> >> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>,
>>>> >> PCollection<T>> but such an interface is insufficient to extract
>>>> >> ordered records per shard. It seems the only concrete implementations
>>>> >> are based on FileBasedSource, which we'd like to avoid, but there's
>>>> no
>>>> >> alternative. An SDF, if exposed, would likely be overkill and
>>>> >> cumbersome to call (given the reflection machinery involved in
>>>> >> invoking DoFns).
>>>> >
>>>> > Seems easiest to just define a new regular Java interface for this.
>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something
>>>> analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how
>>>> much control over iteration you need.
>>>>
>>>> For this application, one wants to iterate over several files in
>>>> parallel. The downside of a new interface is that it shares almost
>>>> nothing with the "normal" sources (e.g. when features (or
>>>> optimizations) get added to one, they won't get added to the other).
>>>
>>>
>>>
>>>>
>>>> > And yes, DoFn's including SDF's are not designed to be used as Java
>>>> interfaces per se. If you need DoFn machinery in this interface (e.g. side
>>>> inputs), use Contextful - s.apache.org/context-fn.
>>>>
>>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
>>>> to build new DoFns out of others (or, really, use them in any context
>>>> other than as an argument to ParDo).
>>>>
>>>> >> > I'll file separate PRs for core changes needed for discussion.
>>>> WDYT?
>>>> >>
>>>> >> Sounds good.
>>>>
>>>
>>> +1
>>>
>>>
>>>> >>
>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
>>>> robertwb@google.com> wrote:
>>>> >> >>
>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <ne...@gmail.com>
>>>> wrote:
>>>> >> >> >
>>>> >> >> > Forking this thread to discuss action items regarding the
>>>> change. We can keep technical discussion in the original thread.
>>>> >> >> >
>>>> >> >> > Background: our SMB POC showed promising performance & cost
>>>> saving improvements and we'd like to adopt it for production soon (by EOY).
>>>> We want to contribute it to Beam so it's better generalized and maintained.
>>>> We also want to avoid divergence between our internal version and the PR
>>>> while it's in progress, specifically any breaking change in the produced
>>>> SMB data.
>>>> >> >>
>>>> >> >> All good goals.
>>>> >> >>
>>>> >> >> > To achieve that I'd like to propose a few action items.
>>>> >> >> >
>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key
>>>> handling, bucket file and metadata format, etc., anything that affect
>>>> produced SMB data.
>>>> >> >> > 2. Revise the existing PR according to #1
>>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
>>>> Compression, etc., but keep the existing file level abstraction
>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark clearly
>>>> as @experimental
>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
>>>> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>>>> >> >> >
>>>> >> >> > #1-4 gives us something usable in the short term, while #1
>>>> guarantees that production data produced today are usable when #5 lands on
>>>> master. #4 also gives early adopters a chance to give feedback.
>>>> >> >> > Due to the scope of #5, it might take much longer and a couple
>>>> of big PRs to achieve, which we can keep iterating on.
>>>> >> >> >
>>>> >> >> > What are your thoughts on this?
>>>> >> >>
>>>> >> >> I would like to see some resolution on the FileIO abstractions
>>>> before
>>>> >> >> merging into experimental. (We have a FileBasedSink that would
>>>> mostly
>>>> >> >> already work, so it's a matter of coming up with an analogous
>>>> Source
>>>> >> >> interface.) Specifically I would not want to merge a set of per
>>>> file
>>>> >> >> type smb IOs without a path forward to this or the determination
>>>> that
>>>> >> >> it's not possible/desirable.
>>>>
>>>

Re: Sort Merge Bucket - Action Items

Posted by Neville Li <ne...@gmail.com>.
I spoke too soon. Turns out for unsharded writes, numShards can't be
determined until the last finalize transform, which is again different from
the current SMB proposal (static number of buckets & shards).
I'll end up with more code specialized for SMB in order to generalize
existing sink code, which I think we all want to avoid.

Seems the only option is duplicating some logic like temp file handling,
which is exactly what we did in the original PR.
I can reuse Compression & Sink<T> for file level writes but that seems
about the most I can reuse right now.

On Tue, Jul 23, 2019 at 6:36 PM Neville Li <ne...@gmail.com> wrote:

> So I spent one afternoon trying some ideas for reusing the last few
> transforms WriteFiles.
>
> WriteShardsIntoTempFilesFn extends DoFn<KV<*ShardedKey<Integer>*,
> Iterable<UserT>>, *FileResult*<DestinationT>>
> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>,
> PCollection<List<ResultT>>>
> => FinalizeTempFileBundles extends PTransform<PCollection<List<
> *FileResult<DestinationT>*>>, WriteFilesResult<DestinationT>>
>
> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId> so I
> can use pre-compute SMB destination file names for the transforms.
> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
> private and easy to change/pull out.
>
> OTOH they are somewhat coupled with the package private
> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
> temp file handing logic lives). Might be hard to decouple either modifying
> existing code or creating new transforms, unless if we re-write most of
> FileBasedSink from scratch.
>
> Let me know if I'm on the wrong track.
>
> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>
> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>> >
>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>
>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <ne...@gmail.com>
>>> wrote:
>>> >> >
>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and
>>> see what needs to be done.
>>> >> >
>>> >> > Eugene pointed out that we shouldn't build on
>>> FileBased{Source,Sink}. So for writes I'll probably build on top of
>>> WriteFiles.
>>> >>
>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>>> >>
>>> >>
>>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>>> >
>>> > Yeah if possible, parameterize FileIO.Sink.
>>> > I would recommend against building on top of WriteFiles either. FileIO
>>> being implemented on top of WriteFiles was supposed to be a temporary
>>> measure - the longer-term plan was to rewrite it from scratch (albeit with
>>> a similar structure) and throw away WriteFiles.
>>> > If possible, I would recommend to pursue this path: if there are parts
>>> of WriteFiles you want to reuse, I would recommend to implement them as new
>>> transforms, not at all tied to FileBasedSink (but ok if tied to
>>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
>>> of these new transforms, or maybe parts of WriteFiles could be swapped out
>>> for them incrementally.
>>>
>>> Thanks for the feedback. There's a lot that was done, but looking at
>>> the code it feels like there's a lot that was not yet done either, and
>>> the longer-term plan wasn't clear (though perhaps I'm just not finding
>>> the right docs).
>>>
>>
>> I'm also a bit unfamiliar with original plans for WriteFiles and for
>> updating source interfaces, but I prefer not significantly modifying
>> existing IO transforms to suite the SMB use-case. If there are existing
>> pieces of code that can be easily re-used that is fine, but existing
>> sources/sinks are designed to perform a PCollection -> file transformation
>> and vice versa with (usually) runner determined sharding. Things specific
>> to SMB such as sharding restrictions, writing metadata to a separate file,
>> reading multiple files from the same abstraction, does not sound like
>> features that should be included in our usual file read/write transforms.
>>
>>
>>> >> > Read might be a bigger change w.r.t. collocating ordered elements
>>> across files within a bucket and TBH I'm not even sure where to start.
>>> >>
>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>>> >> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>,
>>> >> PCollection<T>> but such an interface is insufficient to extract
>>> >> ordered records per shard. It seems the only concrete implementations
>>> >> are based on FileBasedSource, which we'd like to avoid, but there's no
>>> >> alternative. An SDF, if exposed, would likely be overkill and
>>> >> cumbersome to call (given the reflection machinery involved in
>>> >> invoking DoFns).
>>> >
>>> > Seems easiest to just define a new regular Java interface for this.
>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something
>>> analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how
>>> much control over iteration you need.
>>>
>>> For this application, one wants to iterate over several files in
>>> parallel. The downside of a new interface is that it shares almost
>>> nothing with the "normal" sources (e.g. when features (or
>>> optimizations) get added to one, they won't get added to the other).
>>
>>
>>
>>>
>>> > And yes, DoFn's including SDF's are not designed to be used as Java
>>> interfaces per se. If you need DoFn machinery in this interface (e.g. side
>>> inputs), use Contextful - s.apache.org/context-fn.
>>>
>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
>>> to build new DoFns out of others (or, really, use them in any context
>>> other than as an argument to ParDo).
>>>
>>> >> > I'll file separate PRs for core changes needed for discussion. WDYT?
>>> >>
>>> >> Sounds good.
>>>
>>
>> +1
>>
>>
>>> >>
>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
>>> robertwb@google.com> wrote:
>>> >> >>
>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <ne...@gmail.com>
>>> wrote:
>>> >> >> >
>>> >> >> > Forking this thread to discuss action items regarding the
>>> change. We can keep technical discussion in the original thread.
>>> >> >> >
>>> >> >> > Background: our SMB POC showed promising performance & cost
>>> saving improvements and we'd like to adopt it for production soon (by EOY).
>>> We want to contribute it to Beam so it's better generalized and maintained.
>>> We also want to avoid divergence between our internal version and the PR
>>> while it's in progress, specifically any breaking change in the produced
>>> SMB data.
>>> >> >>
>>> >> >> All good goals.
>>> >> >>
>>> >> >> > To achieve that I'd like to propose a few action items.
>>> >> >> >
>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key
>>> handling, bucket file and metadata format, etc., anything that affect
>>> produced SMB data.
>>> >> >> > 2. Revise the existing PR according to #1
>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
>>> Compression, etc., but keep the existing file level abstraction
>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark clearly
>>> as @experimental
>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
>>> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>>> >> >> >
>>> >> >> > #1-4 gives us something usable in the short term, while #1
>>> guarantees that production data produced today are usable when #5 lands on
>>> master. #4 also gives early adopters a chance to give feedback.
>>> >> >> > Due to the scope of #5, it might take much longer and a couple
>>> of big PRs to achieve, which we can keep iterating on.
>>> >> >> >
>>> >> >> > What are your thoughts on this?
>>> >> >>
>>> >> >> I would like to see some resolution on the FileIO abstractions
>>> before
>>> >> >> merging into experimental. (We have a FileBasedSink that would
>>> mostly
>>> >> >> already work, so it's a matter of coming up with an analogous
>>> Source
>>> >> >> interface.) Specifically I would not want to merge a set of per
>>> file
>>> >> >> type smb IOs without a path forward to this or the determination
>>> that
>>> >> >> it's not possible/desirable.
>>>
>>

Re: Sort Merge Bucket - Action Items

Posted by Neville Li <ne...@gmail.com>.
So I spent one afternoon trying some ideas for reusing the last few
transforms WriteFiles.

WriteShardsIntoTempFilesFn extends DoFn<KV<*ShardedKey<Integer>*,
Iterable<UserT>>, *FileResult*<DestinationT>>
=> GatherResults<ResultT> extends PTransform<PCollection<ResultT>,
PCollection<List<ResultT>>>
=> FinalizeTempFileBundles extends PTransform<PCollection<List<
*FileResult<DestinationT>*>>, WriteFilesResult<DestinationT>>

I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId> so I
can use pre-compute SMB destination file names for the transforms.
I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
private and easy to change/pull out.

OTOH they are somewhat coupled with the package private
{Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
temp file handing logic lives). Might be hard to decouple either modifying
existing code or creating new transforms, unless if we re-write most of
FileBasedSink from scratch.

Let me know if I'm on the wrong track.

WIP Branch https://github.com/spotify/beam/tree/neville/write-files

On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>> >
>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <ne...@gmail.com>
>> wrote:
>> >> >
>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and
>> see what needs to be done.
>> >> >
>> >> > Eugene pointed out that we shouldn't build on
>> FileBased{Source,Sink}. So for writes I'll probably build on top of
>> WriteFiles.
>> >>
>> >> Meaning it could be parameterized by FileIO.Sink, right?
>> >>
>> >>
>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>> >
>> > Yeah if possible, parameterize FileIO.Sink.
>> > I would recommend against building on top of WriteFiles either. FileIO
>> being implemented on top of WriteFiles was supposed to be a temporary
>> measure - the longer-term plan was to rewrite it from scratch (albeit with
>> a similar structure) and throw away WriteFiles.
>> > If possible, I would recommend to pursue this path: if there are parts
>> of WriteFiles you want to reuse, I would recommend to implement them as new
>> transforms, not at all tied to FileBasedSink (but ok if tied to
>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
>> of these new transforms, or maybe parts of WriteFiles could be swapped out
>> for them incrementally.
>>
>> Thanks for the feedback. There's a lot that was done, but looking at
>> the code it feels like there's a lot that was not yet done either, and
>> the longer-term plan wasn't clear (though perhaps I'm just not finding
>> the right docs).
>>
>
> I'm also a bit unfamiliar with original plans for WriteFiles and for
> updating source interfaces, but I prefer not significantly modifying
> existing IO transforms to suite the SMB use-case. If there are existing
> pieces of code that can be easily re-used that is fine, but existing
> sources/sinks are designed to perform a PCollection -> file transformation
> and vice versa with (usually) runner determined sharding. Things specific
> to SMB such as sharding restrictions, writing metadata to a separate file,
> reading multiple files from the same abstraction, does not sound like
> features that should be included in our usual file read/write transforms.
>
>
>> >> > Read might be a bigger change w.r.t. collocating ordered elements
>> across files within a bucket and TBH I'm not even sure where to start.
>> >>
>> >> Yeah, here we need an interface that gives us ReadableFile ->
>> >> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>,
>> >> PCollection<T>> but such an interface is insufficient to extract
>> >> ordered records per shard. It seems the only concrete implementations
>> >> are based on FileBasedSource, which we'd like to avoid, but there's no
>> >> alternative. An SDF, if exposed, would likely be overkill and
>> >> cumbersome to call (given the reflection machinery involved in
>> >> invoking DoFns).
>> >
>> > Seems easiest to just define a new regular Java interface for this.
>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something
>> analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how
>> much control over iteration you need.
>>
>> For this application, one wants to iterate over several files in
>> parallel. The downside of a new interface is that it shares almost
>> nothing with the "normal" sources (e.g. when features (or
>> optimizations) get added to one, they won't get added to the other).
>
>
>
>>
>> > And yes, DoFn's including SDF's are not designed to be used as Java
>> interfaces per se. If you need DoFn machinery in this interface (e.g. side
>> inputs), use Contextful - s.apache.org/context-fn.
>>
>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
>> to build new DoFns out of others (or, really, use them in any context
>> other than as an argument to ParDo).
>>
>> >> > I'll file separate PRs for core changes needed for discussion. WDYT?
>> >>
>> >> Sounds good.
>>
>
> +1
>
>
>> >>
>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >> >>
>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <ne...@gmail.com>
>> wrote:
>> >> >> >
>> >> >> > Forking this thread to discuss action items regarding the change.
>> We can keep technical discussion in the original thread.
>> >> >> >
>> >> >> > Background: our SMB POC showed promising performance & cost
>> saving improvements and we'd like to adopt it for production soon (by EOY).
>> We want to contribute it to Beam so it's better generalized and maintained.
>> We also want to avoid divergence between our internal version and the PR
>> while it's in progress, specifically any breaking change in the produced
>> SMB data.
>> >> >>
>> >> >> All good goals.
>> >> >>
>> >> >> > To achieve that I'd like to propose a few action items.
>> >> >> >
>> >> >> > 1. Reach a consensus about bucket and shard strategy, key
>> handling, bucket file and metadata format, etc., anything that affect
>> produced SMB data.
>> >> >> > 2. Revise the existing PR according to #1
>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
>> Compression, etc., but keep the existing file level abstraction
>> >> >> > 4. (Optional) Merge code into extensions::smb but mark clearly as
>> @experimental
>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
>> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>> >> >> >
>> >> >> > #1-4 gives us something usable in the short term, while #1
>> guarantees that production data produced today are usable when #5 lands on
>> master. #4 also gives early adopters a chance to give feedback.
>> >> >> > Due to the scope of #5, it might take much longer and a couple of
>> big PRs to achieve, which we can keep iterating on.
>> >> >> >
>> >> >> > What are your thoughts on this?
>> >> >>
>> >> >> I would like to see some resolution on the FileIO abstractions
>> before
>> >> >> merging into experimental. (We have a FileBasedSink that would
>> mostly
>> >> >> already work, so it's a matter of coming up with an analogous Source
>> >> >> interface.) Specifically I would not want to merge a set of per file
>> >> >> type smb IOs without a path forward to this or the determination
>> that
>> >> >> it's not possible/desirable.
>>
>

Re: Sort Merge Bucket - Action Items

Posted by Chamikara Jayalath <ch...@google.com>.
On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <ro...@google.com> wrote:

> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <ki...@google.com>
> wrote:
> >
> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <ne...@gmail.com>
> wrote:
> >> >
> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and see
> what needs to be done.
> >> >
> >> > Eugene pointed out that we shouldn't build on FileBased{Source,Sink}.
> So for writes I'll probably build on top of WriteFiles.
> >>
> >> Meaning it could be parameterized by FileIO.Sink, right?
> >>
> >>
> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
> >
> > Yeah if possible, parameterize FileIO.Sink.
> > I would recommend against building on top of WriteFiles either. FileIO
> being implemented on top of WriteFiles was supposed to be a temporary
> measure - the longer-term plan was to rewrite it from scratch (albeit with
> a similar structure) and throw away WriteFiles.
> > If possible, I would recommend to pursue this path: if there are parts
> of WriteFiles you want to reuse, I would recommend to implement them as new
> transforms, not at all tied to FileBasedSink (but ok if tied to
> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
> of these new transforms, or maybe parts of WriteFiles could be swapped out
> for them incrementally.
>
> Thanks for the feedback. There's a lot that was done, but looking at
> the code it feels like there's a lot that was not yet done either, and
> the longer-term plan wasn't clear (though perhaps I'm just not finding
> the right docs).
>

I'm also a bit unfamiliar with original plans for WriteFiles and for
updating source interfaces, but I prefer not significantly modifying
existing IO transforms to suite the SMB use-case. If there are existing
pieces of code that can be easily re-used that is fine, but existing
sources/sinks are designed to perform a PCollection -> file transformation
and vice versa with (usually) runner determined sharding. Things specific
to SMB such as sharding restrictions, writing metadata to a separate file,
reading multiple files from the same abstraction, does not sound like
features that should be included in our usual file read/write transforms.


> >> > Read might be a bigger change w.r.t. collocating ordered elements
> across files within a bucket and TBH I'm not even sure where to start.
> >>
> >> Yeah, here we need an interface that gives us ReadableFile ->
> >> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>,
> >> PCollection<T>> but such an interface is insufficient to extract
> >> ordered records per shard. It seems the only concrete implementations
> >> are based on FileBasedSource, which we'd like to avoid, but there's no
> >> alternative. An SDF, if exposed, would likely be overkill and
> >> cumbersome to call (given the reflection machinery involved in
> >> invoking DoFns).
> >
> > Seems easiest to just define a new regular Java interface for this.
> > Could be either, indeed, ReadableFile -> Iterable<T>, or something
> analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how
> much control over iteration you need.
>
> For this application, one wants to iterate over several files in
> parallel. The downside of a new interface is that it shares almost
> nothing with the "normal" sources (e.g. when features (or
> optimizations) get added to one, they won't get added to the other).



>
> > And yes, DoFn's including SDF's are not designed to be used as Java
> interfaces per se. If you need DoFn machinery in this interface (e.g. side
> inputs), use Contextful - s.apache.org/context-fn.
>
> Yeah, one of the primary downsides to the NewDoFns is how hard it is
> to build new DoFns out of others (or, really, use them in any context
> other than as an argument to ParDo).
>
> >> > I'll file separate PRs for core changes needed for discussion. WDYT?
> >>
> >> Sounds good.
>

+1


> >>
> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >>
> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <ne...@gmail.com>
> wrote:
> >> >> >
> >> >> > Forking this thread to discuss action items regarding the change.
> We can keep technical discussion in the original thread.
> >> >> >
> >> >> > Background: our SMB POC showed promising performance & cost saving
> improvements and we'd like to adopt it for production soon (by EOY). We
> want to contribute it to Beam so it's better generalized and maintained. We
> also want to avoid divergence between our internal version and the PR while
> it's in progress, specifically any breaking change in the produced SMB data.
> >> >>
> >> >> All good goals.
> >> >>
> >> >> > To achieve that I'd like to propose a few action items.
> >> >> >
> >> >> > 1. Reach a consensus about bucket and shard strategy, key
> handling, bucket file and metadata format, etc., anything that affect
> produced SMB data.
> >> >> > 2. Revise the existing PR according to #1
> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
> Compression, etc., but keep the existing file level abstraction
> >> >> > 4. (Optional) Merge code into extensions::smb but mark clearly as
> @experimental
> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
> >> >> >
> >> >> > #1-4 gives us something usable in the short term, while #1
> guarantees that production data produced today are usable when #5 lands on
> master. #4 also gives early adopters a chance to give feedback.
> >> >> > Due to the scope of #5, it might take much longer and a couple of
> big PRs to achieve, which we can keep iterating on.
> >> >> >
> >> >> > What are your thoughts on this?
> >> >>
> >> >> I would like to see some resolution on the FileIO abstractions before
> >> >> merging into experimental. (We have a FileBasedSink that would mostly
> >> >> already work, so it's a matter of coming up with an analogous Source
> >> >> interface.) Specifically I would not want to merge a set of per file
> >> >> type smb IOs without a path forward to this or the determination that
> >> >> it's not possible/desirable.
>

Re: Sort Merge Bucket - Action Items

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <ki...@google.com> wrote:
>
> On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <ne...@gmail.com> wrote:
>> >
>> > Thanks Robert. Agree with the FileIO point. I'll look into it and see what needs to be done.
>> >
>> > Eugene pointed out that we shouldn't build on FileBased{Source,Sink}. So for writes I'll probably build on top of WriteFiles.
>>
>> Meaning it could be parameterized by FileIO.Sink, right?
>>
>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>
> Yeah if possible, parameterize FileIO.Sink.
> I would recommend against building on top of WriteFiles either. FileIO being implemented on top of WriteFiles was supposed to be a temporary measure - the longer-term plan was to rewrite it from scratch (albeit with a similar structure) and throw away WriteFiles.
> If possible, I would recommend to pursue this path: if there are parts of WriteFiles you want to reuse, I would recommend to implement them as new transforms, not at all tied to FileBasedSink (but ok if tied to FileIO.Sink), with the goal in mind that FileIO could be rewritten on top of these new transforms, or maybe parts of WriteFiles could be swapped out for them incrementally.

Thanks for the feedback. There's a lot that was done, but looking at
the code it feels like there's a lot that was not yet done either, and
the longer-term plan wasn't clear (though perhaps I'm just not finding
the right docs).

>> > Read might be a bigger change w.r.t. collocating ordered elements across files within a bucket and TBH I'm not even sure where to start.
>>
>> Yeah, here we need an interface that gives us ReadableFile ->
>> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>,
>> PCollection<T>> but such an interface is insufficient to extract
>> ordered records per shard. It seems the only concrete implementations
>> are based on FileBasedSource, which we'd like to avoid, but there's no
>> alternative. An SDF, if exposed, would likely be overkill and
>> cumbersome to call (given the reflection machinery involved in
>> invoking DoFns).
>
> Seems easiest to just define a new regular Java interface for this.
> Could be either, indeed, ReadableFile -> Iterable<T>, or something analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how much control over iteration you need.

For this application, one wants to iterate over several files in
parallel. The downside of a new interface is that it shares almost
nothing with the "normal" sources (e.g. when features (or
optimizations) get added to one, they won't get added to the other).

> And yes, DoFn's including SDF's are not designed to be used as Java interfaces per se. If you need DoFn machinery in this interface (e.g. side inputs), use Contextful - s.apache.org/context-fn.

Yeah, one of the primary downsides to the NewDoFns is how hard it is
to build new DoFns out of others (or, really, use them in any context
other than as an argument to ParDo).

>> > I'll file separate PRs for core changes needed for discussion. WDYT?
>>
>> Sounds good.
>>
>> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <ro...@google.com> wrote:
>> >>
>> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <ne...@gmail.com> wrote:
>> >> >
>> >> > Forking this thread to discuss action items regarding the change. We can keep technical discussion in the original thread.
>> >> >
>> >> > Background: our SMB POC showed promising performance & cost saving improvements and we'd like to adopt it for production soon (by EOY). We want to contribute it to Beam so it's better generalized and maintained. We also want to avoid divergence between our internal version and the PR while it's in progress, specifically any breaking change in the produced SMB data.
>> >>
>> >> All good goals.
>> >>
>> >> > To achieve that I'd like to propose a few action items.
>> >> >
>> >> > 1. Reach a consensus about bucket and shard strategy, key handling, bucket file and metadata format, etc., anything that affect produced SMB data.
>> >> > 2. Revise the existing PR according to #1
>> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, Compression, etc., but keep the existing file level abstraction
>> >> > 4. (Optional) Merge code into extensions::smb but mark clearly as @experimental
>> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn, GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>> >> >
>> >> > #1-4 gives us something usable in the short term, while #1 guarantees that production data produced today are usable when #5 lands on master. #4 also gives early adopters a chance to give feedback.
>> >> > Due to the scope of #5, it might take much longer and a couple of big PRs to achieve, which we can keep iterating on.
>> >> >
>> >> > What are your thoughts on this?
>> >>
>> >> I would like to see some resolution on the FileIO abstractions before
>> >> merging into experimental. (We have a FileBasedSink that would mostly
>> >> already work, so it's a matter of coming up with an analogous Source
>> >> interface.) Specifically I would not want to merge a set of per file
>> >> type smb IOs without a path forward to this or the determination that
>> >> it's not possible/desirable.

Re: Sort Merge Bucket - Action Items

Posted by Eugene Kirpichov <ki...@google.com>.
On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <ro...@google.com> wrote:

> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <ne...@gmail.com> wrote:
> >
> > Thanks Robert. Agree with the FileIO point. I'll look into it and see
> what needs to be done.
> >
> > Eugene pointed out that we shouldn't build on FileBased{Source,Sink}. So
> for writes I'll probably build on top of WriteFiles.
>
> Meaning it could be parameterized by FileIO.Sink, right?
>
>
> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779

Yeah if possible, parameterize FileIO.Sink.
I would recommend against building on top of WriteFiles either. FileIO
being implemented on top of WriteFiles was supposed to be a temporary
measure - the longer-term plan was to rewrite it from scratch (albeit with
a similar structure) and throw away WriteFiles.
If possible, I would recommend to pursue this path: if there are parts of
WriteFiles you want to reuse, I would recommend to implement them as new
transforms, not at all tied to FileBasedSink (but ok if tied to
FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
of these new transforms, or maybe parts of WriteFiles could be swapped out
for them incrementally.


>
>
> > Read might be a bigger change w.r.t. collocating ordered elements across
> files within a bucket and TBH I'm not even sure where to start.
>
> Yeah, here we need an interface that gives us ReadableFile ->
> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>,
> PCollection<T>> but such an interface is insufficient to extract
> ordered records per shard. It seems the only concrete implementations
> are based on FileBasedSource, which we'd like to avoid, but there's no
> alternative. An SDF, if exposed, would likely be overkill and
> cumbersome to call (given the reflection machinery involved in
> invoking DoFns).
>
Seems easiest to just define a new regular Java interface for this.
Could be either, indeed, ReadableFile -> Iterable<T>, or something
analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how
much control over iteration you need.
And yes, DoFn's including SDF's are not designed to be used as Java
interfaces per se. If you need DoFn machinery in this interface (e.g. side
inputs), use Contextful - s.apache.org/context-fn.


>
> > I'll file separate PRs for core changes needed for discussion. WDYT?
>
> Sounds good.
>
> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <ne...@gmail.com>
> wrote:
> >> >
> >> > Forking this thread to discuss action items regarding the change. We
> can keep technical discussion in the original thread.
> >> >
> >> > Background: our SMB POC showed promising performance & cost saving
> improvements and we'd like to adopt it for production soon (by EOY). We
> want to contribute it to Beam so it's better generalized and maintained. We
> also want to avoid divergence between our internal version and the PR while
> it's in progress, specifically any breaking change in the produced SMB data.
> >>
> >> All good goals.
> >>
> >> > To achieve that I'd like to propose a few action items.
> >> >
> >> > 1. Reach a consensus about bucket and shard strategy, key handling,
> bucket file and metadata format, etc., anything that affect produced SMB
> data.
> >> > 2. Revise the existing PR according to #1
> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
> Compression, etc., but keep the existing file level abstraction
> >> > 4. (Optional) Merge code into extensions::smb but mark clearly as
> @experimental
> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
> >> >
> >> > #1-4 gives us something usable in the short term, while #1 guarantees
> that production data produced today are usable when #5 lands on master. #4
> also gives early adopters a chance to give feedback.
> >> > Due to the scope of #5, it might take much longer and a couple of big
> PRs to achieve, which we can keep iterating on.
> >> >
> >> > What are your thoughts on this?
> >>
> >> I would like to see some resolution on the FileIO abstractions before
> >> merging into experimental. (We have a FileBasedSink that would mostly
> >> already work, so it's a matter of coming up with an analogous Source
> >> interface.) Specifically I would not want to merge a set of per file
> >> type smb IOs without a path forward to this or the determination that
> >> it's not possible/desirable.
>

Re: Sort Merge Bucket - Action Items

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, Jul 22, 2019 at 4:04 PM Neville Li <ne...@gmail.com> wrote:
>
> Thanks Robert. Agree with the FileIO point. I'll look into it and see what needs to be done.
>
> Eugene pointed out that we shouldn't build on FileBased{Source,Sink}. So for writes I'll probably build on top of WriteFiles.

Meaning it could be parameterized by FileIO.Sink, right?

https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779

> Read might be a bigger change w.r.t. collocating ordered elements across files within a bucket and TBH I'm not even sure where to start.

Yeah, here we need an interface that gives us ReadableFile ->
Iterable<T>. There are existing PTransform<PCollection<ReadableFile>,
PCollection<T>> but such an interface is insufficient to extract
ordered records per shard. It seems the only concrete implementations
are based on FileBasedSource, which we'd like to avoid, but there's no
alternative. An SDF, if exposed, would likely be overkill and
cumbersome to call (given the reflection machinery involved in
invoking DoFns).

> I'll file separate PRs for core changes needed for discussion. WDYT?

Sounds good.

> On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <ne...@gmail.com> wrote:
>> >
>> > Forking this thread to discuss action items regarding the change. We can keep technical discussion in the original thread.
>> >
>> > Background: our SMB POC showed promising performance & cost saving improvements and we'd like to adopt it for production soon (by EOY). We want to contribute it to Beam so it's better generalized and maintained. We also want to avoid divergence between our internal version and the PR while it's in progress, specifically any breaking change in the produced SMB data.
>>
>> All good goals.
>>
>> > To achieve that I'd like to propose a few action items.
>> >
>> > 1. Reach a consensus about bucket and shard strategy, key handling, bucket file and metadata format, etc., anything that affect produced SMB data.
>> > 2. Revise the existing PR according to #1
>> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, Compression, etc., but keep the existing file level abstraction
>> > 4. (Optional) Merge code into extensions::smb but mark clearly as @experimental
>> > 5. Incorporate ideas from the discussion, e.g. ShardingFn, GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>> >
>> > #1-4 gives us something usable in the short term, while #1 guarantees that production data produced today are usable when #5 lands on master. #4 also gives early adopters a chance to give feedback.
>> > Due to the scope of #5, it might take much longer and a couple of big PRs to achieve, which we can keep iterating on.
>> >
>> > What are your thoughts on this?
>>
>> I would like to see some resolution on the FileIO abstractions before
>> merging into experimental. (We have a FileBasedSink that would mostly
>> already work, so it's a matter of coming up with an analogous Source
>> interface.) Specifically I would not want to merge a set of per file
>> type smb IOs without a path forward to this or the determination that
>> it's not possible/desirable.

Re: Sort Merge Bucket - Action Items

Posted by Neville Li <ne...@gmail.com>.
Thanks Robert. Agree with the FileIO point. I'll look into it and see what
needs to be done.

Eugene pointed out that we shouldn't build on FileBased{Source,Sink}. So
for writes I'll probably build on top of WriteFiles. Read might be a bigger
change w.r.t. collocating ordered elements across files within a bucket and
TBH I'm not even sure where to start.

I'll file separate PRs for core changes needed for discussion. WDYT?

On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <ro...@google.com> wrote:

> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <ne...@gmail.com> wrote:
> >
> > Forking this thread to discuss action items regarding the change. We can
> keep technical discussion in the original thread.
> >
> > Background: our SMB POC showed promising performance & cost saving
> improvements and we'd like to adopt it for production soon (by EOY). We
> want to contribute it to Beam so it's better generalized and maintained. We
> also want to avoid divergence between our internal version and the PR while
> it's in progress, specifically any breaking change in the produced SMB data.
>
> All good goals.
>
> > To achieve that I'd like to propose a few action items.
> >
> > 1. Reach a consensus about bucket and shard strategy, key handling,
> bucket file and metadata format, etc., anything that affect produced SMB
> data.
> > 2. Revise the existing PR according to #1
> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, Compression,
> etc., but keep the existing file level abstraction
> > 4. (Optional) Merge code into extensions::smb but mark clearly as
> @experimental
> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
> >
> > #1-4 gives us something usable in the short term, while #1 guarantees
> that production data produced today are usable when #5 lands on master. #4
> also gives early adopters a chance to give feedback.
> > Due to the scope of #5, it might take much longer and a couple of big
> PRs to achieve, which we can keep iterating on.
> >
> > What are your thoughts on this?
>
> I would like to see some resolution on the FileIO abstractions before
> merging into experimental. (We have a FileBasedSink that would mostly
> already work, so it's a matter of coming up with an analogous Source
> interface.) Specifically I would not want to merge a set of per file
> type smb IOs without a path forward to this or the determination that
> it's not possible/desirable.
>

Re: Sort Merge Bucket - Action Items

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, Jul 19, 2019 at 5:16 PM Neville Li <ne...@gmail.com> wrote:
>
> Forking this thread to discuss action items regarding the change. We can keep technical discussion in the original thread.
>
> Background: our SMB POC showed promising performance & cost saving improvements and we'd like to adopt it for production soon (by EOY). We want to contribute it to Beam so it's better generalized and maintained. We also want to avoid divergence between our internal version and the PR while it's in progress, specifically any breaking change in the produced SMB data.

All good goals.

> To achieve that I'd like to propose a few action items.
>
> 1. Reach a consensus about bucket and shard strategy, key handling, bucket file and metadata format, etc., anything that affect produced SMB data.
> 2. Revise the existing PR according to #1
> 3. Reduce duplicate file IO logic by reusing FileIO.Sink, Compression, etc., but keep the existing file level abstraction
> 4. (Optional) Merge code into extensions::smb but mark clearly as @experimental
> 5. Incorporate ideas from the discussion, e.g. ShardingFn, GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>
> #1-4 gives us something usable in the short term, while #1 guarantees that production data produced today are usable when #5 lands on master. #4 also gives early adopters a chance to give feedback.
> Due to the scope of #5, it might take much longer and a couple of big PRs to achieve, which we can keep iterating on.
>
> What are your thoughts on this?

I would like to see some resolution on the FileIO abstractions before
merging into experimental. (We have a FileBasedSink that would mostly
already work, so it's a matter of coming up with an analogous Source
interface.) Specifically I would not want to merge a set of per file
type smb IOs without a path forward to this or the determination that
it's not possible/desirable.