You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Claire McGinty <cl...@gmail.com> on 2019/08/12 14:37:45 UTC

Re: Sort Merge Bucket - Action Items

Hi! Wanted to bump this thread with some updated PRs that reflect these
discussions (updated IOs that parameterize FileIO#Sink, and re-use
ReadableFile). The base pull requests are:

- https://github.com/apache/beam/pull/8823 (BucketMetadata implementation)
- https://github.com/apache/beam/pull/8824/ (FileOperations/IO
implementations)

And the actual PTransform implements build on top of those PRs:
- https://github.com/apache/beam/pull/9250/ (SMB Sink transform)
- https://github.com/apache/beam/pull/9251 (SMB Source transform)

Finally we have some benchmarks/style changes (using AutoValue/Builder
pattern) for those PTransforms:
- https://github.com/apache/beam/pull/9253/ (high-level API classes/style
fixes)
- https://github.com/apache/beam/pull/9279 (benchmarks for SMB sink and
source transform)

I know it's a lot of pull requests at once -- let us know if there's
anything else we can clarify or streamline. Thanks!

- Claire/Neville



On Fri, Jul 26, 2019 at 12:45 PM Kenneth Knowles <ke...@apache.org> wrote:

> There is still considerable value in knowing data sources statically so
> you can do things like fetch sizes and other metadata and adjust pipeline
> shape. I would not expect to delete these, but to implement them on top of
> SDF while still giving them a clear URN and payload so runners can know
> that it is a statically-specified source.
>
> Kenn
>
> On Fri, Jul 26, 2019 at 3:23 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Thu, Jul 25, 2019 at 11:09 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>> >
>> > Hi Gleb,
>> >
>> > Regarding the future of io.Read: ideally things would go as follows
>> > - All runners support SDF at feature parity with Read (mostly this is
>> just the Dataflow runner's liquid sharding and size estimation for bounded
>> sources, and backlog for unbounded sources, but I recall that a couple of
>> other runners also used size estimation)
>> > - Bounded/UnboundedSource APIs are declared "deprecated" - it is
>> forbidden to add any new implementations to SDK, and users shouldn't use
>> them either (note: I believe it's already effectively forbidden to use them
>> for cases where a DoFn/SDF at the current level of support will be
>> sufficient)
>> > - People one by one rewrite existing Bounded/UnboundedSource based
>> PTransforms in the SDK to use SDFs instead
>> > - Read.from() is rewritten to use a wrapper SDF over the given Source,
>> and explicit support for Read is deleted from runners
>> > - In the next major version of Beam - presumably 3.0 - the Read
>> transform itself is deleted
>> >
>> > I don't know what's the current status of SDF/Read feature parity,
>> maybe Luke or Cham can comment. An alternative path is offered in
>> http://s.apache.org/sdf-via-source.
>>
>> Python supports initial splitting for SDF of all sources on portable
>> runners. Dataflow support for batch SDF is undergoing testing, not yet
>> rolled out. Dataflow support for streaming SDF is awaiting portable
>> state/timer support.
>>
>> > On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov <gl...@spotify.com> wrote:
>> >>
>> >> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it
>> going away in favor of SDF, or we are always going to have both?
>> >>
>> >> I was looking into AvroIO.read and AvroIO.readAll, both of them use
>> AvroSource. AvroIO.readAll is using SDF, and it's implemented with
>> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
>> ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it
>> should be enough to have something like (KV<ReadableFile, OffsetRange>,
>> OutputReceiver<T>), as we have discussed in this thread, and that should be
>> fine for SMB as well. It would require duplicating code from AvroSource,
>> but in the end, I don't see it as a problem if AvroSource is going away.
>> >>
>> >> I'm attaching a small diagram I put for myself to better understand
>> the code.
>> >>
>> >> AvroIO.readAll :: PTransform<PBegin, PCollection<T>> ->
>> >>
>> >> FileIO.matchAll :: PTransform<PCollection<String>,
>> PCollection<MatchResult.Metadata>>
>> >> FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>,
>> PCollection<ReadableFile>>
>> >> AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>,
>> PCollection<T>> ->
>> >>
>> >> ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>,
>> PCollection<T>> ->
>> >>
>> >> ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile,
>> OffsetRange>>) (splittable do fn)
>> >>
>> >> Reshuffle.viaRandomKey()
>> >>
>> >> ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile,
>> OffsetRange>, T>) where
>> >>
>> >> createSource :: String -> FileBasedSource<T>
>> >>
>> >> createSource = AvroSource
>> >>
>> >>
>> >> AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin,
>> PCollection<T>> ->
>> >>
>> >> Read.Bounded.from(createSource) where
>> >>
>> >> createSource :: String -> FileBasedSource<T>
>> >>
>> >> createSource = AvroSource
>> >>
>> >>
>> >> Gleb
>> >>
>> >>
>> >> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>>
>> >>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <ke...@apache.org>
>> wrote:
>> >>> >
>> >>> > From the peanut gallery, keeping a separate implementation for SMB
>> seems fine. Dependencies are serious liabilities for both upstream and
>> downstream. It seems like the reuse angle is generating extra work, and
>> potentially making already-complex implementations more complex, instead of
>> helping things.
>> >>>
>> >>> +1
>> >>>
>> >>> To be clear, what I care about is that WriteFiles(X) and
>> >>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
>> >>> TFRecord, ...}. In other words composability of the API (vs. manually
>> >>> filling out the matrix). If WriteFiles and WriteSmbFiles find
>> >>> opportunities for (easy, clean) implementation sharing, that'd be
>> >>> nice, but not the primary goal.
>> >>>
>> >>> (Similarly for reading, though that's seem less obvious. Certainly
>> >>> whatever T is useful for ReadSmb(T) could be useful for a
>> >>> (non-liquid-shading) ReadAll(T) however.)
>> >>>
>> >>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <ne...@gmail.com>
>> wrote:
>> >>> >>
>> >>> >> I spoke too soon. Turns out for unsharded writes, numShards can't
>> be determined until the last finalize transform, which is again different
>> from the current SMB proposal (static number of buckets & shards).
>> >>> >> I'll end up with more code specialized for SMB in order to
>> generalize existing sink code, which I think we all want to avoid.
>> >>> >>
>> >>> >> Seems the only option is duplicating some logic like temp file
>> handling, which is exactly what we did in the original PR.
>> >>> >> I can reuse Compression & Sink<T> for file level writes but that
>> seems about the most I can reuse right now.
>> >>> >>
>> >>> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <ne...@gmail.com>
>> wrote:
>> >>> >>>
>> >>> >>> So I spent one afternoon trying some ideas for reusing the last
>> few transforms WriteFiles.
>> >>> >>>
>> >>> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>,
>> Iterable<UserT>>, FileResult<DestinationT>>
>> >>> >>> => GatherResults<ResultT> extends
>> PTransform<PCollection<ResultT>, PCollection<List<ResultT>>>
>> >>> >>> => FinalizeTempFileBundles extends
>> PTransform<PCollection<List<FileResult<DestinationT>>>,
>> WriteFilesResult<DestinationT>>
>> >>> >>>
>> >>> >>> I replaced FileResult<DestinationT> with KV<DestinationT,
>> ResourceId> so I can use pre-compute SMB destination file names for the
>> transforms.
>> >>> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
>> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
>> private and easy to change/pull out.
>> >>> >>>
>> >>> >>> OTOH they are somewhat coupled with the package private
>> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
>> temp file handing logic lives). Might be hard to decouple either modifying
>> existing code or creating new transforms, unless if we re-write most of
>> FileBasedSink from scratch.
>> >>> >>>
>> >>> >>> Let me know if I'm on the wrong track.
>> >>> >>>
>> >>> >>> WIP Branch
>> https://github.com/spotify/beam/tree/neville/write-files
>> >>> >>>
>> >>> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <
>> chamikara@google.com> wrote:
>> >>> >>>>
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>> >>>>>
>> >>> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <
>> kirpichov@google.com> wrote:
>> >>> >>>>> >
>> >>> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>> >>>>> >>
>> >>> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <
>> neville.lyh@gmail.com> wrote:
>> >>> >>>>> >> >
>> >>> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into
>> it and see what needs to be done.
>> >>> >>>>> >> >
>> >>> >>>>> >> > Eugene pointed out that we shouldn't build on
>> FileBased{Source,Sink}. So for writes I'll probably build on top of
>> WriteFiles.
>> >>> >>>>> >>
>> >>> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>> >>> >>>>> >>
>> >>> >>>>> >>
>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>> >>> >>>>> >
>> >>> >>>>> > Yeah if possible, parameterize FileIO.Sink.
>> >>> >>>>> > I would recommend against building on top of WriteFiles
>> either. FileIO being implemented on top of WriteFiles was supposed to be a
>> temporary measure - the longer-term plan was to rewrite it from scratch
>> (albeit with a similar structure) and throw away WriteFiles.
>> >>> >>>>> > If possible, I would recommend to pursue this path: if there
>> are parts of WriteFiles you want to reuse, I would recommend to implement
>> them as new transforms, not at all tied to FileBasedSink (but ok if tied to
>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
>> of these new transforms, or maybe parts of WriteFiles could be swapped out
>> for them incrementally.
>> >>> >>>>>
>> >>> >>>>> Thanks for the feedback. There's a lot that was done, but
>> looking at
>> >>> >>>>> the code it feels like there's a lot that was not yet done
>> either, and
>> >>> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not
>> finding
>> >>> >>>>> the right docs).
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and
>> for updating source interfaces, but I prefer not significantly modifying
>> existing IO transforms to suite the SMB use-case. If there are existing
>> pieces of code that can be easily re-used that is fine, but existing
>> sources/sinks are designed to perform a PCollection -> file transformation
>> and vice versa with (usually) runner determined sharding. Things specific
>> to SMB such as sharding restrictions, writing metadata to a separate file,
>> reading multiple files from the same abstraction, does not sound like
>> features that should be included in our usual file read/write transforms.
>> >>> >>>>
>> >>> >>>>>
>> >>> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered
>> elements across files within a bucket and TBH I'm not even sure where to
>> start.
>> >>> >>>>> >>
>> >>> >>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>> >>> >>>>> >> Iterable<T>. There are existing
>> PTransform<PCollection<ReadableFile>,
>> >>> >>>>> >> PCollection<T>> but such an interface is insufficient to
>> extract
>> >>> >>>>> >> ordered records per shard. It seems the only concrete
>> implementations
>> >>> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but
>> there's no
>> >>> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and
>> >>> >>>>> >> cumbersome to call (given the reflection machinery involved
>> in
>> >>> >>>>> >> invoking DoFns).
>> >>> >>>>> >
>> >>> >>>>> > Seems easiest to just define a new regular Java interface for
>> this.
>> >>> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or
>> something analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void.
>> Depends on how much control over iteration you need.
>> >>> >>>>>
>> >>> >>>>> For this application, one wants to iterate over several files in
>> >>> >>>>> parallel. The downside of a new interface is that it shares
>> almost
>> >>> >>>>> nothing with the "normal" sources (e.g. when features (or
>> >>> >>>>> optimizations) get added to one, they won't get added to the
>> other).
>> >>> >>>>
>> >>> >>>>
>> >>> >>>>>
>> >>> >>>>>
>> >>> >>>>> > And yes, DoFn's including SDF's are not designed to be used
>> as Java interfaces per se. If you need DoFn machinery in this interface
>> (e.g. side inputs), use Contextful - s.apache.org/context-fn.
>> >>> >>>>>
>> >>> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard
>> it is
>> >>> >>>>> to build new DoFns out of others (or, really, use them in any
>> context
>> >>> >>>>> other than as an argument to ParDo).
>> >>> >>>>>
>> >>> >>>>> >> > I'll file separate PRs for core changes needed for
>> discussion. WDYT?
>> >>> >>>>> >>
>> >>> >>>>> >> Sounds good.
>> >>> >>>>
>> >>> >>>>
>> >>> >>>> +1
>> >>> >>>>
>> >>> >>>>>
>> >>> >>>>> >>
>> >>> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>> >>>>> >> >>
>> >>> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <
>> neville.lyh@gmail.com> wrote:
>> >>> >>>>> >> >> >
>> >>> >>>>> >> >> > Forking this thread to discuss action items regarding
>> the change. We can keep technical discussion in the original thread.
>> >>> >>>>> >> >> >
>> >>> >>>>> >> >> > Background: our SMB POC showed promising performance &
>> cost saving improvements and we'd like to adopt it for production soon (by
>> EOY). We want to contribute it to Beam so it's better generalized and
>> maintained. We also want to avoid divergence between our internal version
>> and the PR while it's in progress, specifically any breaking change in the
>> produced SMB data.
>> >>> >>>>> >> >>
>> >>> >>>>> >> >> All good goals.
>> >>> >>>>> >> >>
>> >>> >>>>> >> >> > To achieve that I'd like to propose a few action items.
>> >>> >>>>> >> >> >
>> >>> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy,
>> key handling, bucket file and metadata format, etc., anything that affect
>> produced SMB data.
>> >>> >>>>> >> >> > 2. Revise the existing PR according to #1
>> >>> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing
>> FileIO.Sink, Compression, etc., but keep the existing file level abstraction
>> >>> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark
>> clearly as @experimental
>> >>> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g.
>> ShardingFn, GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>> >>> >>>>> >> >> >
>> >>> >>>>> >> >> > #1-4 gives us something usable in the short term, while
>> #1 guarantees that production data produced today are usable when #5 lands
>> on master. #4 also gives early adopters a chance to give feedback.
>> >>> >>>>> >> >> > Due to the scope of #5, it might take much longer and a
>> couple of big PRs to achieve, which we can keep iterating on.
>> >>> >>>>> >> >> >
>> >>> >>>>> >> >> > What are your thoughts on this?
>> >>> >>>>> >> >>
>> >>> >>>>> >> >> I would like to see some resolution on the FileIO
>> abstractions before
>> >>> >>>>> >> >> merging into experimental. (We have a FileBasedSink that
>> would mostly
>> >>> >>>>> >> >> already work, so it's a matter of coming up with an
>> analogous Source
>> >>> >>>>> >> >> interface.) Specifically I would not want to merge a set
>> of per file
>> >>> >>>>> >> >> type smb IOs without a path forward to this or the
>> determination that
>> >>> >>>>> >> >> it's not possible/desirable.
>> >>
>> >>
>> >>
>> >> --
>> >> Cheers,
>> >> Gleb
>>
>