You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Aljoscha Krettek <al...@apache.org> on 2016/05/16 16:00:30 UTC

Re: [PROPOSAL] Writing More Expressive Beam Tests

Hi,
sorry for resurrecting such an old thread but are there already thoughts on
how the quiescence handling will work for runner-independent tests?

I was thinking about how to make the RunnableOnService tests work when
executed in "true-streaming" mode, i.e. when the job would normally never
finish? Right now, the tests work because the sources finish at some point
and we verify that the PAssert DoFn sees the correct results. With
streaming runners this "finished" bit is hard to do and I feel that it is
related to the quiescence idea expression in the document.

Cheers,
Aljoscha

On Thu, 31 Mar 2016 at 19:32 Ben Chambers <bc...@google.com.invalid>
wrote:

> On Mon, Mar 28, 2016 at 4:29 PM Robert Bradshaw
> <ro...@google.com.invalid>
> wrote:
>
> > On Fri, Mar 25, 2016 at 4:28 PM, Ben Chambers
> > <bc...@google.com.invalid> wrote:
> > > My only concern is that in the example, you first need to declare all
> the
> > > inputs, then the pipeline to be tested, then all the outputs. This can
> > lead
> > > to tests that are hard to follow, since what you're really testing is
> an
> > > interleaving more like "When these inputs arrive, I get this output.
> Then
> > > when this happens, I get that output. Etc.".
> >
> > +1 to pursuing this direction.
> >
> > > What if instea of returning a PTransform<PBegin, PCollection<Long>> we
> > had
> > > a "TestSource".
> >
> > I think TestSource is a PTransform<PBegin, PCollection<Long>>.
> >
>
> Maybe? If we want it to easily support multiple inputs, maybe you do
> `testSource.getInput(tag)` to get the `PTransform<PBegin, PCollection<T>>`
> associated with a given tag? But yes, I intended the `TestSource` to be
> usable within the pipeline to actually produce the data.
>
> >
> > > so we did something like:
> > >
> > > TestPipeline p = TestPipeline.create();
> > > TestSource source = p.testSource();
> > >
> > > // Set up pipeline reading from source.
> > > PCollection<Long> sum = ...;
> >
> > I'm really curious what the "..." looks like. How are we using the
> source?
> >
>
> Either `p.apply(source)` or `p.apply(source.forTag(tag))`. Not sure about
> naming, of course.
>
> >
> > > BeamAssert sumAssert = BeamAssert.sum();
> >
> > Did you mean BeamAssert.that(sum)?
> >
>
> Almost certainly. Or maybe `BeamAssert.on(sum)`. But something like that.
>
> > // Test for the Speculative Pane
> > > source.addElements(...);
> > > source.advanceWatermark(...);
> > > sumAssert.thatWindowPane(...);
> > >
> > > // Test for the On Time Pane
> > > source.addElements(...)
> > > source.advanceWatermark(...);
> > > sumAssert.thatWindowPane(...);
> > >
> > > etc.
> >
> > Is there a p.run() at the end?
> >
>
> Almost certainly.
>
>
> > > We could also allow TestSource to work with multiple input pipelines
> like
> > > this:
> > >
> > > TestSource<Integer> intSource = p.testSource(new
> > TypeDescriptor<Integer>());
> > > TestSource<Long> longSource = p.testSource(new TypeDescriptor<Long>());
> > > ...
> > > intSource.addElements(...);
> > > longSource.addElements(...);
> > > etc.
> >
> > Would we get at total ordering on the addition of elements/advancement
> > of watermarks across sources by the temporal ordering of these
> > operations in the users program (e.g. by incrementing some global
> > counter)?
> >
>
> Ideally? I was focusing on the interleaving of inputs/assertions, but we
> can talk more about this.
>
>
> > > On Fri, Mar 25, 2016 at 4:08 PM Thomas Groh <tg...@google.com.invalid>
> > > wrote:
> > >
> > >> Hey everyone;
> > >>
> > >> I'd still be happy to get feedback. I'm going to start working on this
> > >> early next week
> > >>
> > >> Thanks,
> > >>
> > >> Thomas
> > >>
> > >> On Mon, Mar 21, 2016 at 5:38 PM, Thomas Groh <tg...@google.com>
> wrote:
> > >>
> > >> > Hey everyone,
> > >> >
> > >> > I've been working on a proposal to expand the capabilities of our
> > testing
> > >> > API, mostly around writing deterministic tests for pipelines that
> have
> > >> > interesting triggering behavior, especially speculative and late
> > >> triggers.
> > >> >
> > >> > I've shared a doc here
> > >> > <
> > >>
> >
> https://docs.google.com/document/d/1fZUUbG2LxBtqCVabQshldXIhkMcXepsbv2vuuny8Ix4/edit?usp=sharing
> > >
> > >> containing
> > >> > the proposal and some examples, with world comment access + explicit
> > >> > committer edit access. I'd welcome any feedback you all have.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Thomas
> > >> >
> > >>
> >
>

Re: [PROPOSAL] Writing More Expressive Beam Tests

Posted by Aljoscha Krettek <al...@apache.org>.
I see, I haven't thought about watermarks for that but it makes sense. In
Flink, we could just observe watermarks in the sources and shut down
sources when we see a Long.MAX_VALUE watermark. This in turn would bring
down the whole pipeline, starting from the sources.

On Tue, 24 May 2016 at 00:07 Thomas Groh <tg...@google.com.invalid> wrote:

> This is different than the quiescence property proposed in the document -
> quiescence is an idleness property ("the pipeline cannot make progress"),
> but not a completeness property ("the pipeline will never make progress").
>
> However, the existing property of watermarks does mostly solve this problem
> - given that allowed lateness is finite, if all of the root transforms of a
> Pipeline advance their watermarks to positive infinity, the pipeline will
> be complete (as all new inputs are droppably late, so they will be
> dropped). This also causes all windows that contain elements to be closed
> (which will cause the execution of the appropriate PAsserts on those
> windows). This notion of completion is runner-independent; however,
> shutting down the pipeline requires runners to either provide hooks to
> allow users to observe this completed state, or the runner to notice that
> all PTransforms have completed and shut down the pipeline. Notably, this
> notion of completion is simpler than quiescence (as it only requires access
> to the watermarks of the system), so runners can implement it
> independently.
>
> On Mon, May 16, 2016 at 9:00 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi,
> > sorry for resurrecting such an old thread but are there already thoughts
> on
> > how the quiescence handling will work for runner-independent tests?
> >
> > I was thinking about how to make the RunnableOnService tests work when
> > executed in "true-streaming" mode, i.e. when the job would normally never
> > finish? Right now, the tests work because the sources finish at some
> point
> > and we verify that the PAssert DoFn sees the correct results. With
> > streaming runners this "finished" bit is hard to do and I feel that it is
> > related to the quiescence idea expression in the document.
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 31 Mar 2016 at 19:32 Ben Chambers <bc...@google.com.invalid>
> > wrote:
> >
> > > On Mon, Mar 28, 2016 at 4:29 PM Robert Bradshaw
> > > <ro...@google.com.invalid>
> > > wrote:
> > >
> > > > On Fri, Mar 25, 2016 at 4:28 PM, Ben Chambers
> > > > <bc...@google.com.invalid> wrote:
> > > > > My only concern is that in the example, you first need to declare
> all
> > > the
> > > > > inputs, then the pipeline to be tested, then all the outputs. This
> > can
> > > > lead
> > > > > to tests that are hard to follow, since what you're really testing
> is
> > > an
> > > > > interleaving more like "When these inputs arrive, I get this
> output.
> > > Then
> > > > > when this happens, I get that output. Etc.".
> > > >
> > > > +1 to pursuing this direction.
> > > >
> > > > > What if instea of returning a PTransform<PBegin, PCollection<Long>>
> > we
> > > > had
> > > > > a "TestSource".
> > > >
> > > > I think TestSource is a PTransform<PBegin, PCollection<Long>>.
> > > >
> > >
> > > Maybe? If we want it to easily support multiple inputs, maybe you do
> > > `testSource.getInput(tag)` to get the `PTransform<PBegin,
> > PCollection<T>>`
> > > associated with a given tag? But yes, I intended the `TestSource` to be
> > > usable within the pipeline to actually produce the data.
> > >
> > > >
> > > > > so we did something like:
> > > > >
> > > > > TestPipeline p = TestPipeline.create();
> > > > > TestSource source = p.testSource();
> > > > >
> > > > > // Set up pipeline reading from source.
> > > > > PCollection<Long> sum = ...;
> > > >
> > > > I'm really curious what the "..." looks like. How are we using the
> > > source?
> > > >
> > >
> > > Either `p.apply(source)` or `p.apply(source.forTag(tag))`. Not sure
> about
> > > naming, of course.
> > >
> > > >
> > > > > BeamAssert sumAssert = BeamAssert.sum();
> > > >
> > > > Did you mean BeamAssert.that(sum)?
> > > >
> > >
> > > Almost certainly. Or maybe `BeamAssert.on(sum)`. But something like
> that.
> > >
> > > > // Test for the Speculative Pane
> > > > > source.addElements(...);
> > > > > source.advanceWatermark(...);
> > > > > sumAssert.thatWindowPane(...);
> > > > >
> > > > > // Test for the On Time Pane
> > > > > source.addElements(...)
> > > > > source.advanceWatermark(...);
> > > > > sumAssert.thatWindowPane(...);
> > > > >
> > > > > etc.
> > > >
> > > > Is there a p.run() at the end?
> > > >
> > >
> > > Almost certainly.
> > >
> > >
> > > > > We could also allow TestSource to work with multiple input
> pipelines
> > > like
> > > > > this:
> > > > >
> > > > > TestSource<Integer> intSource = p.testSource(new
> > > > TypeDescriptor<Integer>());
> > > > > TestSource<Long> longSource = p.testSource(new
> > TypeDescriptor<Long>());
> > > > > ...
> > > > > intSource.addElements(...);
> > > > > longSource.addElements(...);
> > > > > etc.
> > > >
> > > > Would we get at total ordering on the addition of
> elements/advancement
> > > > of watermarks across sources by the temporal ordering of these
> > > > operations in the users program (e.g. by incrementing some global
> > > > counter)?
> > > >
> > >
> > > Ideally? I was focusing on the interleaving of inputs/assertions, but
> we
> > > can talk more about this.
> > >
> > >
> > > > > On Fri, Mar 25, 2016 at 4:08 PM Thomas Groh
> <tgroh@google.com.invalid
> > >
> > > > > wrote:
> > > > >
> > > > >> Hey everyone;
> > > > >>
> > > > >> I'd still be happy to get feedback. I'm going to start working on
> > this
> > > > >> early next week
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Thomas
> > > > >>
> > > > >> On Mon, Mar 21, 2016 at 5:38 PM, Thomas Groh <tg...@google.com>
> > > wrote:
> > > > >>
> > > > >> > Hey everyone,
> > > > >> >
> > > > >> > I've been working on a proposal to expand the capabilities of
> our
> > > > testing
> > > > >> > API, mostly around writing deterministic tests for pipelines
> that
> > > have
> > > > >> > interesting triggering behavior, especially speculative and late
> > > > >> triggers.
> > > > >> >
> > > > >> > I've shared a doc here
> > > > >> > <
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1fZUUbG2LxBtqCVabQshldXIhkMcXepsbv2vuuny8Ix4/edit?usp=sharing
> > > > >
> > > > >> containing
> > > > >> > the proposal and some examples, with world comment access +
> > explicit
> > > > >> > committer edit access. I'd welcome any feedback you all have.
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Thomas
> > > > >> >
> > > > >>
> > > >
> > >
> >
>

Re: [PROPOSAL] Writing More Expressive Beam Tests

Posted by Thomas Groh <tg...@google.com.INVALID>.
This is different than the quiescence property proposed in the document -
quiescence is an idleness property ("the pipeline cannot make progress"),
but not a completeness property ("the pipeline will never make progress").

However, the existing property of watermarks does mostly solve this problem
- given that allowed lateness is finite, if all of the root transforms of a
Pipeline advance their watermarks to positive infinity, the pipeline will
be complete (as all new inputs are droppably late, so they will be
dropped). This also causes all windows that contain elements to be closed
(which will cause the execution of the appropriate PAsserts on those
windows). This notion of completion is runner-independent; however,
shutting down the pipeline requires runners to either provide hooks to
allow users to observe this completed state, or the runner to notice that
all PTransforms have completed and shut down the pipeline. Notably, this
notion of completion is simpler than quiescence (as it only requires access
to the watermarks of the system), so runners can implement it independently.

On Mon, May 16, 2016 at 9:00 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> sorry for resurrecting such an old thread but are there already thoughts on
> how the quiescence handling will work for runner-independent tests?
>
> I was thinking about how to make the RunnableOnService tests work when
> executed in "true-streaming" mode, i.e. when the job would normally never
> finish? Right now, the tests work because the sources finish at some point
> and we verify that the PAssert DoFn sees the correct results. With
> streaming runners this "finished" bit is hard to do and I feel that it is
> related to the quiescence idea expression in the document.
>
> Cheers,
> Aljoscha
>
> On Thu, 31 Mar 2016 at 19:32 Ben Chambers <bc...@google.com.invalid>
> wrote:
>
> > On Mon, Mar 28, 2016 at 4:29 PM Robert Bradshaw
> > <ro...@google.com.invalid>
> > wrote:
> >
> > > On Fri, Mar 25, 2016 at 4:28 PM, Ben Chambers
> > > <bc...@google.com.invalid> wrote:
> > > > My only concern is that in the example, you first need to declare all
> > the
> > > > inputs, then the pipeline to be tested, then all the outputs. This
> can
> > > lead
> > > > to tests that are hard to follow, since what you're really testing is
> > an
> > > > interleaving more like "When these inputs arrive, I get this output.
> > Then
> > > > when this happens, I get that output. Etc.".
> > >
> > > +1 to pursuing this direction.
> > >
> > > > What if instea of returning a PTransform<PBegin, PCollection<Long>>
> we
> > > had
> > > > a "TestSource".
> > >
> > > I think TestSource is a PTransform<PBegin, PCollection<Long>>.
> > >
> >
> > Maybe? If we want it to easily support multiple inputs, maybe you do
> > `testSource.getInput(tag)` to get the `PTransform<PBegin,
> PCollection<T>>`
> > associated with a given tag? But yes, I intended the `TestSource` to be
> > usable within the pipeline to actually produce the data.
> >
> > >
> > > > so we did something like:
> > > >
> > > > TestPipeline p = TestPipeline.create();
> > > > TestSource source = p.testSource();
> > > >
> > > > // Set up pipeline reading from source.
> > > > PCollection<Long> sum = ...;
> > >
> > > I'm really curious what the "..." looks like. How are we using the
> > source?
> > >
> >
> > Either `p.apply(source)` or `p.apply(source.forTag(tag))`. Not sure about
> > naming, of course.
> >
> > >
> > > > BeamAssert sumAssert = BeamAssert.sum();
> > >
> > > Did you mean BeamAssert.that(sum)?
> > >
> >
> > Almost certainly. Or maybe `BeamAssert.on(sum)`. But something like that.
> >
> > > // Test for the Speculative Pane
> > > > source.addElements(...);
> > > > source.advanceWatermark(...);
> > > > sumAssert.thatWindowPane(...);
> > > >
> > > > // Test for the On Time Pane
> > > > source.addElements(...)
> > > > source.advanceWatermark(...);
> > > > sumAssert.thatWindowPane(...);
> > > >
> > > > etc.
> > >
> > > Is there a p.run() at the end?
> > >
> >
> > Almost certainly.
> >
> >
> > > > We could also allow TestSource to work with multiple input pipelines
> > like
> > > > this:
> > > >
> > > > TestSource<Integer> intSource = p.testSource(new
> > > TypeDescriptor<Integer>());
> > > > TestSource<Long> longSource = p.testSource(new
> TypeDescriptor<Long>());
> > > > ...
> > > > intSource.addElements(...);
> > > > longSource.addElements(...);
> > > > etc.
> > >
> > > Would we get at total ordering on the addition of elements/advancement
> > > of watermarks across sources by the temporal ordering of these
> > > operations in the users program (e.g. by incrementing some global
> > > counter)?
> > >
> >
> > Ideally? I was focusing on the interleaving of inputs/assertions, but we
> > can talk more about this.
> >
> >
> > > > On Fri, Mar 25, 2016 at 4:08 PM Thomas Groh <tgroh@google.com.invalid
> >
> > > > wrote:
> > > >
> > > >> Hey everyone;
> > > >>
> > > >> I'd still be happy to get feedback. I'm going to start working on
> this
> > > >> early next week
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Thomas
> > > >>
> > > >> On Mon, Mar 21, 2016 at 5:38 PM, Thomas Groh <tg...@google.com>
> > wrote:
> > > >>
> > > >> > Hey everyone,
> > > >> >
> > > >> > I've been working on a proposal to expand the capabilities of our
> > > testing
> > > >> > API, mostly around writing deterministic tests for pipelines that
> > have
> > > >> > interesting triggering behavior, especially speculative and late
> > > >> triggers.
> > > >> >
> > > >> > I've shared a doc here
> > > >> > <
> > > >>
> > >
> >
> https://docs.google.com/document/d/1fZUUbG2LxBtqCVabQshldXIhkMcXepsbv2vuuny8Ix4/edit?usp=sharing
> > > >
> > > >> containing
> > > >> > the proposal and some examples, with world comment access +
> explicit
> > > >> > committer edit access. I'd welcome any feedback you all have.
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Thomas
> > > >> >
> > > >>
> > >
> >
>