You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <ke...@apache.org> on 2021/02/02 18:01:19 UTC

Re: Making preview (sample) time consistent on Direct runner

On Thu, Jan 21, 2021 at 5:40 PM Robert Bradshaw <ro...@google.com> wrote:

> On Thu, Jan 21, 2021 at 12:55 PM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Thanks Kenn! That sounds like a good and achievable strategy to get
>> the first/limit results. I will check the code to see if we can reuse
>> this logic, the extra question is if we may fit in the direct runner
>> for the general use case (not only SQL) maybe via some PipelineOptions
>> of the runner.
>>
>> > Note that both of these don't solve the issue that Read + GBK + take(N)
>> would have to do the full Read+GBK for a batch pipeline.
>>
>> Just to confirm that I understand correctly Robert,
>
>
> This is not just for Read. In batch, you may have [any large pipeline] +
> GBK + take(N), and due to GBK being a barrier there's no way to not execute
> all of [any large pipeline].
>

My suggestion is to run even bounded sources/SDFs in a "streaming" manner,
checkpointing and outputting once in a while. So there will be no barrier
except emitting from GBK according to triggering logic. It is true that in
the global window this will require all inputs to be completely read for
their watermarks to advance to infinity. I think if we restrict to the
direct runner this behavior of checkpointing bounded sources/SDFs is the
missing piece.

One trick is to essentially think of this as a "whole product" problem. It
may be possible, in context, to add triggering to get early results for
previewing a large global window pipeline. You would need to communicate
well to a user that the aggregations are incomplete. Just an idea. It does
seem fraught.


>
you mention this
>> for example for the case of IOs where we can match 1000s of
>> `ReadableFiles` and we will necessarily end up distributing and
>> reading the thousands until we have the take(N) results. You mean we
>> cannot avoid this.
>>
>
> Yes, you'll match thousands of files, but the concurrency of your read
> depends on how many workers you have. If, say, you have 16 worker threads
> you'll quickly reach N records and be able to abort much earlier.
>
>
>> I was wondering if with SDF we could have a generic solution
>> (specially now that most translations are based on SDF), maybe some
>> sort of 'BoundedRestrictionTracker' to deal with the limit and then
>> stop producing output. Maybe Boyuan, Luke or Robert can have an idea
>> if this approach is really viable or there can be issues. I am saying
>> this in the context of finding a solution for all runners.
>>
>
> Something like "Please abort the pipeline once PCollection X has at least
> N records?" The one way SDF plays into this is it provides the runner the
> ability to say to these (potentially long-running usefns) "please stop
> gracefully as soon as you can."
>

Exactly. Do our "bounded" SDFs tend to obey this? For e.g. dynamic work
rebalancing I expected that they would. So I guess like you say
BoundedSource could work the same way.

Kenn


>
>>
>> On Thu, Jan 21, 2021 at 8:34 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >
>> > I don't know that SDF vs. BoundedSources changes things here--for both
>> one can implement take(n) by running until one has N elements and then
>> canceling the pipeline.
>> >
>> > One could have a more sophisticated First(n) operator that has a
>> "back-edge" to checkpoint/splits the upstream operators once a sufficient
>> number of elements has been observed.
>> >
>> > Note that both of these don't solve the issue that Read + GBK + take(N)
>> would have to do the full Read+GBK for a batch pipeline.
>> >
>> > On Thu, Jan 21, 2021 at 10:25 AM Kenneth Knowles <ke...@apache.org>
>> wrote:
>> >>
>> >> I forgot two things:
>> >>
>> >> 1. I totally agree that this is a good opportunity to make Beam more
>> useful. Different engines have their own similar abilities some time, but
>> making it available across the runners and xlang transforms, etc, is way
>> cool.
>> >> 2. You can of course do the same trick for a distributed runner by
>> using a message queue between the pipeline and the controller program. And
>> interactive Beam Java, or improving/unifying the concepts between
>> Python/Java/SQL (Go?) would be great. Not sure how much code can be reused.
>> >>
>> >> Kenn
>> >>
>> >> On Thu, Jan 21, 2021 at 10:15 AM Kenneth Knowles <ke...@apache.org>
>> wrote:
>> >>>
>> >>> I think the approach used in the SQL CLI to implement a LIMIT clause
>> may work for some cases. It only works in the same process with the
>> DirectRunner. It doesn't sample at the source, because you never know what
>> will happen in the query. Instead it counts outputs and then cancels the
>> job when it has enough:
>> https://github.com/apache/beam/blob/a72460272354747a54449358f5df414be4b6d72c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java#L200
>> >>>
>> >>> However if your pipeline starts with a read of 1000s of files it may
>> be a different pattern for invoking SDF:
>> >>>
>> >>> 1. initial splits do not matter much, probably
>> >>> 2. you want to checkpoint and emit values so that the end output of
>> the pipeline can receive them to cancel it; you don't want to read a whole
>> restriction like in a batch case
>> >>>
>> >>> I don't know the status of this, if it needs special treatment or
>> not. There may also be the issue that SDF is more actively developed in
>> portable runners and less so in classic runners.
>> >>>
>> >>> Kenn
>> >>>
>> >>> On Wed, Jan 6, 2021 at 9:05 AM Ismaël Mejía <ie...@gmail.com>
>> wrote:
>> >>>>
>> >>>> > Those are good points. Do you know if the Interactive Runner has
>> been tried in those instances? If so, what were the shortcomings?
>> >>>>
>> >>>> I am not aware of experiences or shortcomings with the Interactive
>> >>>> Runner. The issue is that the Interactive runner is based on python
>> >>>> and all the tools I mention above are Java-based so Python probably
>> >>>> won't be a valid alternative.
>> >>>>
>> >>>> What is concerning for me is that in other similar systems (e.g.
>> >>>> Spark, Flink) a developer can consistently do a `.take(n)` read from
>> a
>> >>>> data source and have results in constant time almost independently of
>> >>>> the size of the targeted data. This allows to  iterate faster and
>> >>>> improve the developer experience.
>> >>>>
>> >>>> What is not clear for me yet is how we can achieve this in a clean
>> >>>> way, given all the 'wrappings' we already have in translation time. I
>> >>>> don't know if there could be a way to override some default
>> >>>> translation(s) to achieve this. Any ideas maybe?
>> >>>>
>> >>>>
>> >>>> On Tue, Jan 5, 2021 at 10:26 PM Sam Rohde <sr...@google.com> wrote:
>> >>>> >
>> >>>> > Hi Ismael,
>> >>>> >
>> >>>> > Those are good points. Do you know if the Interactive Runner has
>> been tried in those instances? If so, what were the shortcomings?
>> >>>> >
>> >>>> > I can also see the use of sampling for a performance benchmarking
>> reason. We have seen others send in known elements which are tracked
>> throughout the pipeline to generate timings for each transform/stage.
>> >>>> >
>> >>>> > -Sam
>> >>>> >
>> >>>> > On Fri, Dec 18, 2020 at 8:24 AM Ismaël Mejía <ie...@gmail.com>
>> wrote:
>> >>>> >>
>> >>>> >> Hello,
>> >>>> >>
>> >>>> >> The use of direct runner for interactive local use cases has
>> increased
>> >>>> >> with the years on Beam due to projects like Scio, Kettle/Hop and
>> our
>> >>>> >> own SQL CLI. All these tools have in common one thing, they show a
>> >>>> >> sample of some source input to the user and interactively apply
>> >>>> >> transforms to it to help users build Pipelines more rapidly.
>> >>>> >>
>> >>>> >> If you build a pipeline today to produce this sample using the
>> Beam’s
>> >>>> >> Sample transform from a set of files, the read of the files
>> happens
>> >>>> >> first and then the sample, so the more files or the bigger they
>> are
>> >>>> >> the longer it takes to produce the sample even if the number of
>> >>>> >> elements expected to read is constant.
>> >>>> >>
>> >>>> >> During Beam Summit last year there were some discussions about
>> how we
>> >>>> >> could improve this scenario (and others) but I have the
>> impression no
>> >>>> >> further discussions happened in the mailing list, so I wanted to
>> know
>> >>>> >> if there are some ideas about how we can get direct runner to
>> improve
>> >>>> >> this case.
>> >>>> >>
>> >>>> >> It seems to me that we can still ‘force’ the count with some
>> static
>> >>>> >> field because it is not a distributed case but I don’t know how
>> we can
>> >>>> >> stop reading once we have the number of sampled elements in a
>> generic
>> >>>> >> way, specially now it seems to me a bit harder to do with pure
>> DoFn
>> >>>> >> (SDF) APIs vs old Source ones, but well that’s just a guess.
>> >>>> >>
>> >>>> >> Does anyone have an idea of how could we generalize this and of
>> course
>> >>>> >> if you see the value of such use case, other ideas for
>> improvements?
>> >>>> >>
>> >>>> >> Regards,
>> >>>> >> Ismaël
>>
>