You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by ed...@gmail.com, ed...@gmail.com on 2018/07/03 17:37:53 UTC

Go SDK: Teardown() not being called with dataflow runner

Essentially I have the following code:

type Writer struct {
  Pool WriterPool
}

func (w *Writer) Setup() {
 w.Pool = Init()
}

func (w* Writer) ProcessElement(ctx, elem Elem) {
  w.Pool.Add(elem)
}

func (w* Writer) Teardown() {
  w.Pool.Write()
  w.Pool.Close()
}

beam.ParDo0(scope, &Writer{}, elemCollection)

The above code runs fine with the direct runner but not with dataflow.

 I added log lines to the above methods, and the ones in Teardown() never appear in the logs. 
If I change my code as follows:

func (w* Writer) ProcessElement(ctx, elem Elem) {
  w.Pool.Add(elem)
  w.Pool.Write()
}

Then I see the data being written, but I lose the ability to pool, plus I am leaking connections.

Is this a known issue, or I am going something wrong?

Thanks again for the help.

Re: Go SDK: Teardown() not being called with dataflow runner

Posted by Eugene Kirpichov <ki...@google.com>.
Hi Eduardo,
These differences are described by the link I sent (
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L465-L666)
- it documents what kind of things it's best to do in each method. Please
let me know if something is still unclear.

On Tue, Jul 3, 2018 at 1:35 PM eduardo.morales@gmail.com <
eduardo.morales@gmail.com> wrote:

> FinishBundle() does the job.
>
> Should I keep using Setup()? What is the difference between Setup() and
> StartBundle()?
>
> Thanks again.
>
> On 2018/07/03 20:10:21, Henning Rohde <he...@google.com> wrote:
> > Teardown has very loose guarantees on when it's called and you
> essentially
> > can't rely on it. Currently, for Go on non-direct runners, we hang on to
> > the bundle descriptors forever and never destroy them (and in turn never
> > call Teardown). Even if we didn't, failures/restarts could cause Teardown
> > to not be called.
> >
> > If something _must_ happen, FinishBundle is the right method.
> >
> > Thanks,
> >  Henning
> >
> > On Tue, Jul 3, 2018 at 10:37 AM eduardo.morales@gmail.com <
> > eduardo.morales@gmail.com> wrote:
> >
> > > Essentially I have the following code:
> > >
> > > type Writer struct {
> > >   Pool WriterPool
> > > }
> > >
> > > func (w *Writer) Setup() {
> > >  w.Pool = Init()
> > > }
> > >
> > > func (w* Writer) ProcessElement(ctx, elem Elem) {
> > >   w.Pool.Add(elem)
> > > }
> > >
> > > func (w* Writer) Teardown() {
> > >   w.Pool.Write()
> > >   w.Pool.Close()
> > > }
> > >
> > > beam.ParDo0(scope, &Writer{}, elemCollection)
> > >
> > > The above code runs fine with the direct runner but not with dataflow.
> > >
> > >  I added log lines to the above methods, and the ones in Teardown()
> never
> > > appear in the logs.
> > > If I change my code as follows:
> > >
> > > func (w* Writer) ProcessElement(ctx, elem Elem) {
> > >   w.Pool.Add(elem)
> > >   w.Pool.Write()
> > > }
> > >
> > > Then I see the data being written, but I lose the ability to pool,
> plus I
> > > am leaking connections.
> > >
> > > Is this a known issue, or I am going something wrong?
> > >
> > > Thanks again for the help.
> > >
> >
>

Re: Go SDK: Teardown() not being called with dataflow runner

Posted by ed...@gmail.com, ed...@gmail.com.
FinishBundle() does the job.

Should I keep using Setup()? What is the difference between Setup() and StartBundle()?

Thanks again.

On 2018/07/03 20:10:21, Henning Rohde <he...@google.com> wrote: 
> Teardown has very loose guarantees on when it's called and you essentially
> can't rely on it. Currently, for Go on non-direct runners, we hang on to
> the bundle descriptors forever and never destroy them (and in turn never
> call Teardown). Even if we didn't, failures/restarts could cause Teardown
> to not be called.
> 
> If something _must_ happen, FinishBundle is the right method.
> 
> Thanks,
>  Henning
> 
> On Tue, Jul 3, 2018 at 10:37 AM eduardo.morales@gmail.com <
> eduardo.morales@gmail.com> wrote:
> 
> > Essentially I have the following code:
> >
> > type Writer struct {
> >   Pool WriterPool
> > }
> >
> > func (w *Writer) Setup() {
> >  w.Pool = Init()
> > }
> >
> > func (w* Writer) ProcessElement(ctx, elem Elem) {
> >   w.Pool.Add(elem)
> > }
> >
> > func (w* Writer) Teardown() {
> >   w.Pool.Write()
> >   w.Pool.Close()
> > }
> >
> > beam.ParDo0(scope, &Writer{}, elemCollection)
> >
> > The above code runs fine with the direct runner but not with dataflow.
> >
> >  I added log lines to the above methods, and the ones in Teardown() never
> > appear in the logs.
> > If I change my code as follows:
> >
> > func (w* Writer) ProcessElement(ctx, elem Elem) {
> >   w.Pool.Add(elem)
> >   w.Pool.Write()
> > }
> >
> > Then I see the data being written, but I lose the ability to pool, plus I
> > am leaking connections.
> >
> > Is this a known issue, or I am going something wrong?
> >
> > Thanks again for the help.
> >
> 

Re: Go SDK: Teardown() not being called with dataflow runner

Posted by Eugene Kirpichov <ki...@google.com>.
Bundle boundaries are unspecified, dependent on the runner and the
particular circumstances during this particular execution, and are
generally unrelated to windowing or to the data contents itself. They have
no semantic meaning - everything would still work exactly the same way even
if every element was in its own bundle, or if the entire contents of a
PCollection was in a single bundle. Bundle boundaries are only a way for
the runner to communicate to you what are the allowed boundaries of
amortizing work: basically, all you need to know for practical purposes is
"if you do batching, flush it in finishBundle".

On Tue, Jul 3, 2018 at 2:29 PM eduardo.morales@gmail.com <
eduardo.morales@gmail.com> wrote:

> Thanks. It is much clearer now...
>
> However, the code comments don't mention how often are
> {Start,Finish}Bundle called. What constitutes a batch?
>
> If I am using a window of 1 minute, can I expect for {Start,Finish}Bundle
> every minute? In other words, will the window produce a batch of my data?
>
> On 2018/07/03 20:31:56, Eugene Kirpichov <ki...@google.com> wrote:
> > Hi Eduardo,
> > Henning is right - the specific guarantees around Setup/Teardown vs.
> > StartBundle/FinishBundle are currently described best in the Java SDK
> > documentation:
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
> > (see
> > documentation to @Setup, @Teardown, @StartBundle, @FinishBundle).
> > For what you are doing, StartBundle/FinishBundle is 100% the proper
> > abstraction - in the current code you are going to see data loss or
> > corruption because the code is violating the fundamental guarantee that
> by
> > the time FinishBundle returns, all work associated with the bundle must
> be
> > completed. Pooling or batching mutations is in fact the primary use case
> > for Start/FinishBundle. Setup/Teardown are for managing volatile
> resources
> > like connections.
> >
> > On Tue, Jul 3, 2018 at 1:10 PM Henning Rohde <he...@google.com> wrote:
> >
> > > Teardown has very loose guarantees on when it's called and you
> essentially
> > > can't rely on it. Currently, for Go on non-direct runners, we hang on
> to
> > > the bundle descriptors forever and never destroy them (and in turn
> never
> > > call Teardown). Even if we didn't, failures/restarts could cause
> Teardown
> > > to not be called.
> > >
> > > If something _must_ happen, FinishBundle is the right method.
> > >
> > > Thanks,
> > >  Henning
> > >
> > > On Tue, Jul 3, 2018 at 10:37 AM eduardo.morales@gmail.com <
> > > eduardo.morales@gmail.com> wrote:
> > >
> > >> Essentially I have the following code:
> > >>
> > >> type Writer struct {
> > >>   Pool WriterPool
> > >> }
> > >>
> > >> func (w *Writer) Setup() {
> > >>  w.Pool = Init()
> > >> }
> > >>
> > >> func (w* Writer) ProcessElement(ctx, elem Elem) {
> > >>   w.Pool.Add(elem)
> > >> }
> > >>
> > >> func (w* Writer) Teardown() {
> > >>   w.Pool.Write()
> > >>   w.Pool.Close()
> > >> }
> > >>
> > >> beam.ParDo0(scope, &Writer{}, elemCollection)
> > >>
> > >> The above code runs fine with the direct runner but not with dataflow.
> > >>
> > >>  I added log lines to the above methods, and the ones in Teardown()
> never
> > >> appear in the logs.
> > >> If I change my code as follows:
> > >>
> > >> func (w* Writer) ProcessElement(ctx, elem Elem) {
> > >>   w.Pool.Add(elem)
> > >>   w.Pool.Write()
> > >> }
> > >>
> > >> Then I see the data being written, but I lose the ability to pool,
> plus I
> > >> am leaking connections.
> > >>
> > >> Is this a known issue, or I am going something wrong?
> > >>
> > >> Thanks again for the help.
> > >>
> > >
> >
>

Re: Go SDK: Teardown() not being called with dataflow runner

Posted by ed...@gmail.com, ed...@gmail.com.
Thanks. It is much clearer now...

However, the code comments don't mention how often are {Start,Finish}Bundle called. What constitutes a batch?

If I am using a window of 1 minute, can I expect for {Start,Finish}Bundle every minute? In other words, will the window produce a batch of my data?

On 2018/07/03 20:31:56, Eugene Kirpichov <ki...@google.com> wrote: 
> Hi Eduardo,
> Henning is right - the specific guarantees around Setup/Teardown vs.
> StartBundle/FinishBundle are currently described best in the Java SDK
> documentation:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
> (see
> documentation to @Setup, @Teardown, @StartBundle, @FinishBundle).
> For what you are doing, StartBundle/FinishBundle is 100% the proper
> abstraction - in the current code you are going to see data loss or
> corruption because the code is violating the fundamental guarantee that by
> the time FinishBundle returns, all work associated with the bundle must be
> completed. Pooling or batching mutations is in fact the primary use case
> for Start/FinishBundle. Setup/Teardown are for managing volatile resources
> like connections.
> 
> On Tue, Jul 3, 2018 at 1:10 PM Henning Rohde <he...@google.com> wrote:
> 
> > Teardown has very loose guarantees on when it's called and you essentially
> > can't rely on it. Currently, for Go on non-direct runners, we hang on to
> > the bundle descriptors forever and never destroy them (and in turn never
> > call Teardown). Even if we didn't, failures/restarts could cause Teardown
> > to not be called.
> >
> > If something _must_ happen, FinishBundle is the right method.
> >
> > Thanks,
> >  Henning
> >
> > On Tue, Jul 3, 2018 at 10:37 AM eduardo.morales@gmail.com <
> > eduardo.morales@gmail.com> wrote:
> >
> >> Essentially I have the following code:
> >>
> >> type Writer struct {
> >>   Pool WriterPool
> >> }
> >>
> >> func (w *Writer) Setup() {
> >>  w.Pool = Init()
> >> }
> >>
> >> func (w* Writer) ProcessElement(ctx, elem Elem) {
> >>   w.Pool.Add(elem)
> >> }
> >>
> >> func (w* Writer) Teardown() {
> >>   w.Pool.Write()
> >>   w.Pool.Close()
> >> }
> >>
> >> beam.ParDo0(scope, &Writer{}, elemCollection)
> >>
> >> The above code runs fine with the direct runner but not with dataflow.
> >>
> >>  I added log lines to the above methods, and the ones in Teardown() never
> >> appear in the logs.
> >> If I change my code as follows:
> >>
> >> func (w* Writer) ProcessElement(ctx, elem Elem) {
> >>   w.Pool.Add(elem)
> >>   w.Pool.Write()
> >> }
> >>
> >> Then I see the data being written, but I lose the ability to pool, plus I
> >> am leaking connections.
> >>
> >> Is this a known issue, or I am going something wrong?
> >>
> >> Thanks again for the help.
> >>
> >
> 

Re: Go SDK: Teardown() not being called with dataflow runner

Posted by Eugene Kirpichov <ki...@google.com>.
Hi Eduardo,
Henning is right - the specific guarantees around Setup/Teardown vs.
StartBundle/FinishBundle are currently described best in the Java SDK
documentation:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
(see
documentation to @Setup, @Teardown, @StartBundle, @FinishBundle).
For what you are doing, StartBundle/FinishBundle is 100% the proper
abstraction - in the current code you are going to see data loss or
corruption because the code is violating the fundamental guarantee that by
the time FinishBundle returns, all work associated with the bundle must be
completed. Pooling or batching mutations is in fact the primary use case
for Start/FinishBundle. Setup/Teardown are for managing volatile resources
like connections.

On Tue, Jul 3, 2018 at 1:10 PM Henning Rohde <he...@google.com> wrote:

> Teardown has very loose guarantees on when it's called and you essentially
> can't rely on it. Currently, for Go on non-direct runners, we hang on to
> the bundle descriptors forever and never destroy them (and in turn never
> call Teardown). Even if we didn't, failures/restarts could cause Teardown
> to not be called.
>
> If something _must_ happen, FinishBundle is the right method.
>
> Thanks,
>  Henning
>
> On Tue, Jul 3, 2018 at 10:37 AM eduardo.morales@gmail.com <
> eduardo.morales@gmail.com> wrote:
>
>> Essentially I have the following code:
>>
>> type Writer struct {
>>   Pool WriterPool
>> }
>>
>> func (w *Writer) Setup() {
>>  w.Pool = Init()
>> }
>>
>> func (w* Writer) ProcessElement(ctx, elem Elem) {
>>   w.Pool.Add(elem)
>> }
>>
>> func (w* Writer) Teardown() {
>>   w.Pool.Write()
>>   w.Pool.Close()
>> }
>>
>> beam.ParDo0(scope, &Writer{}, elemCollection)
>>
>> The above code runs fine with the direct runner but not with dataflow.
>>
>>  I added log lines to the above methods, and the ones in Teardown() never
>> appear in the logs.
>> If I change my code as follows:
>>
>> func (w* Writer) ProcessElement(ctx, elem Elem) {
>>   w.Pool.Add(elem)
>>   w.Pool.Write()
>> }
>>
>> Then I see the data being written, but I lose the ability to pool, plus I
>> am leaking connections.
>>
>> Is this a known issue, or I am going something wrong?
>>
>> Thanks again for the help.
>>
>

Re: Go SDK: Teardown() not being called with dataflow runner

Posted by Henning Rohde <he...@google.com>.
Teardown has very loose guarantees on when it's called and you essentially
can't rely on it. Currently, for Go on non-direct runners, we hang on to
the bundle descriptors forever and never destroy them (and in turn never
call Teardown). Even if we didn't, failures/restarts could cause Teardown
to not be called.

If something _must_ happen, FinishBundle is the right method.

Thanks,
 Henning

On Tue, Jul 3, 2018 at 10:37 AM eduardo.morales@gmail.com <
eduardo.morales@gmail.com> wrote:

> Essentially I have the following code:
>
> type Writer struct {
>   Pool WriterPool
> }
>
> func (w *Writer) Setup() {
>  w.Pool = Init()
> }
>
> func (w* Writer) ProcessElement(ctx, elem Elem) {
>   w.Pool.Add(elem)
> }
>
> func (w* Writer) Teardown() {
>   w.Pool.Write()
>   w.Pool.Close()
> }
>
> beam.ParDo0(scope, &Writer{}, elemCollection)
>
> The above code runs fine with the direct runner but not with dataflow.
>
>  I added log lines to the above methods, and the ones in Teardown() never
> appear in the logs.
> If I change my code as follows:
>
> func (w* Writer) ProcessElement(ctx, elem Elem) {
>   w.Pool.Add(elem)
>   w.Pool.Write()
> }
>
> Then I see the data being written, but I lose the ability to pool, plus I
> am leaking connections.
>
> Is this a known issue, or I am going something wrong?
>
> Thanks again for the help.
>