You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Aviem Zur <av...@gmail.com> on 2017/04/16 11:27:53 UTC

Re: Pipeline termination in the unified Beam model

+1

To help integrate this we can start by adding `ValidatesRunner` tests with
a new category and run it only with runners which adhere to the rules
mentioned, and eventually in all runners.

On Fri, Mar 3, 2017 at 12:46 AM Amit Sela <am...@gmail.com> wrote:

> +1 on Eugene's words - this shows how batch is conceptually a subset of a
> streaming problem.
> I also believe that Stas has a very good point on education - we have to
> try and understand developer's current perspective and try to make the
> transition to the Beam model as natural as possible for new users.
> In addition to good documentation and examples, I think that
> https://issues.apache.org/jira/browse/BEAM-849 is critical, as this is the
> user's end-point to the behaviours discussed here, and so it should be:
> * clear and concise - pipeline state at any point should be informative.
> * well documented - documentation, examples, and use-cases (e.g., Eugene's
> "poison pill").
> * strict API for runners - joining Stas' not on unified implementation for
> portability.
>
> On Thu, Mar 2, 2017 at 8:49 PM Eugene Kirpichov
> <ki...@google.com.invalid> wrote:
>
> > OK, I'm glad everybody is in agreement on this. I raised this point
> because
> > we've been discussing implementing this behavior in the Dataflow
> streaming
> > runner, and I wanted to make sure that people are okay with it from a
> > conceptual point of view before proceeding.
> >
> > On Thu, Mar 2, 2017 at 10:27 AM Kenneth Knowles <kl...@google.com.invalid>
> > wrote:
> >
> > Isn't this already the case? I think semantically it is an unavoidable
> > conclusion, so certainly +1 to that.
> >
> > The DirectRunner and TestDataflowRunner both have this behavior already.
> > I've always considered that a streaming job running forever is just
> [very]
> > suboptimal shutdown latency :-)
> >
> > Some bits of the discussion on the ticket seem to surround whether or how
> > to communicate this property in a generic way. Since a runner owns its
> > PipelineResult it doesn't seem necessary.
> >
> > So is the bottom line just that you want to more strongly insist that
> > runners really terminate in a timely manner? I'm +1 to that, too, for
> > basically the reason Stas gives: In order to easily programmatically
> > orchestrate Beam pipelines in a portable way, you do need to know whether
> > the pipeline will finish without thinking about the specific runner and
> its
> > options (as with our RunnableOnService tests).
> >
> > Kenn
> >
> > On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin <dhalperi@google.com.invalid
> >
> > wrote:
> >
> > > Note that even "unbounded pipeline in a streaming
> > runner".waitUntilFinish()
> > > can return, e.g., if you cancel it or terminate it. It's totally
> > reasonable
> > > for users to want to understand and handle these cases.
> > >
> > > +1
> > >
> > > Dan
> > >
> > > On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> > > wrote:
> > >
> > > > +1
> > > >
> > > > Good idea !!
> > > >
> > > > Regards
> > > > JB
> > > >
> > > >
> > > > On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
> > > >
> > > >> Raising this onto the mailing list from
> > > >> https://issues.apache.org/jira/browse/BEAM-849
> > > >>
> > > >> The issue came up: what does it mean for a pipeline to finish, in
> the
> > > Beam
> > > >> model?
> > > >>
> > > >> Note that I am deliberately not talking about "batch" and
> "streaming"
> > > >> pipelines, because this distinction does not exist in the model.
> > Several
> > > >> runners have batch/streaming *modes*, which implement the same
> > semantics
> > > >> (potentially different subsets: in batch mode typically a runner
> will
> > > >> reject pipelines that have at least one unbounded PCollection) but
> in
> > an
> > > >> operationally different way. However we should define pipeline
> > > termination
> > > >> at the level of the unified model, and then make sure that all
> runners
> > > in
> > > >> all modes implement that properly.
> > > >>
> > > >> One natural way is to say "a pipeline terminates when the output
> > > >> watermarks
> > > >> of all of its PCollection's progress to +infinity". (Note: this can
> be
> > > >> generalized, I guess, to having partial executions of a pipeline: if
> > > >> you're
> > > >> interested in the full contents of only some collections, then you
> > wait
> > > >> until only the watermarks of those collections progress to infinity)
> > > >>
> > > >> A typical "batch" runner mode does not implement watermarks - we can
> > > think
> > > >> of it as assigning watermark -infinity to an output of a transform
> > that
> > > >> hasn't started executing yet, and +infinity to output of a transform
> > > that
> > > >> has finished executing. This is consistent with how such runners
> > > implement
> > > >> termination in practice.
> > > >>
> > > >> Dataflow streaming runner additionally implements such termination
> for
> > > >> pipeline drain operation: it has 2 parts: 1) stop consuming input
> from
> > > the
> > > >> sources, and 2) wait until all watermarks progress to infinity.
> > > >>
> > > >> Let us fill the gap by making this part of the Beam model and
> > declaring
> > > >> that all runners should implement this behavior. This will give nice
> > > >> properties, e.g.:
> > > >> - A pipeline that has only bounded collections can be run by any
> > runner
> > > in
> > > >> any mode, with the same results and termination behavior (this is
> > > actually
> > > >> my motivating example for raising this issue is: I was running
> > > Splittable
> > > >> DoFn tests
> > > >> <https://github.com/apache/beam/blob/master/sdks/java/core/
> > > >>
> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
> > > >> with the streaming Dataflow runner - these tests produce only
> bounded
> > > >> collections - and noticed that they wouldn't terminate even though
> all
> > > >> data
> > > >> was processed)
> > > >> - It will be possible to implement pipelines that stream data for a
> > > while
> > > >> and then eventually successfully terminate based on some condition.
> > > E.g. a
> > > >> pipeline that watches a continuously growing file until it is marked
> > > >> read-only, or a pipeline that reads a Kafka topic partition until it
> > > >> receives a "poison pill" message. This seems handy.
> > > >>
> > > >>
> > > > --
> > > > Jean-Baptiste Onofré
> > > > jbonofre@apache.org
> > > > http://blog.nanthrax.net
> > > > Talend - http://www.talend.com
> > > >
> > >
> >
>