You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Demin Alexey <di...@gmail.com> on 2016/11/18 15:40:43 UTC

Flink runner. Wrapper for DoFn

Hi

In flink runner we have this code:

https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L262

but in mostly cases method startBundle can be expensive for making for
every element (for example connection for db/build cache/ etc)

Why so important invoke startBundle/finishBundle on every
incoming streamRecord ?

Thanks
Alexey Diomin

Re: Flink runner. Wrapper for DoFn

Posted by Aljoscha Krettek <al...@apache.org>.
I think we should change the Flink runner to always enable checkpointing
and have a default checkpointing interval. But this has some tricky
implications where I'm not yet sure if it will be possible to do this.

On Sun, 20 Nov 2016 at 13:43 Alexey Demin <di...@gmail.com> wrote:

Hi, Aljoscha

I returned with question about wrapper =)

kafka1 -> transformer1 -> kafka2

Load data from kafka1, split in 10+ events and push result in kafka2

processing use PushbackSideInputDoFnRunner and chaining

but Pushback use streaming wrapper DoFnOperator which invoke finishBundle
on every element and as result I have:

1) load from kafka
2) parsing and invoke context.collect(element1)
3) chaining to kafka
4) finishBundle kafka => kafka.flush()
5) context.collect(element2)
6) chaining to kafka
7) finishBundle kafka => kafka.flush()
...
etc

Do you have idea how I can prevent flush() on every element, because now
it's bottleneck for me?

Thanks,
Alexey Diomin


2016-11-19 11:59 GMT+04:00 Aljoscha Krettek <al...@apache.org>:

@amir, what do you mean? Naming a ParDo "startBundle" is not the same thing
as having a @StartBundle or startBundle() (for OldDoFn) method in your
ParDo.

On Sat, 19 Nov 2016 at 00:22 amir bahmanyari <am...@yahoo.com.invalid>
wrote:

> Interesting. I have been including "startBundle" in KafkaIO() thus
> far.What could happen as far as Flink cluster performance in the
> following?Thanks Aljoscha.
> PCollection<KV<String, String>> kafkarecords = p
>
.apply(KafkaIO.read().withBootstrapServers("kafka01:9092").withTopics(topics)
> .withValueCoder(StringUtf8Coder.of()).withoutMetadata())
> .apply("startBundle", ParDo.of( new DoFn<KV<byte[], String>, KV<String,
> String>>() {
> Amir-
>
>       From: Aljoscha Krettek <al...@apache.org>
>  To: amir bahmanyari <am...@yahoo.com>; Eugene Kirpichov <
> kirpichov@google.com>; "dev@beam.incubator.apache.org" <
> dev@beam.incubator.apache.org>
>  Sent: Friday, November 18, 2016 2:54 PM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> Regarding the Flink runner and how it calls startBundle()/finishBundle():
> it's currently done like this because it is correct and because there is
no
> other "natural" point where it could be called. Flink continuously
> processes elements and at some (user defined) interval performs
checkpoints
> to persist state. We could call startBundle()/finishBundle() when this
> happens but I chose not to (for the time being) because this could lead to
> problems if the user sets a rather large interval. Users can even disable
> checkpointing, in which case we would never call these methods.
>
> --
> Aljoscha
>
> On Fri, 18 Nov 2016 at 22:17 amir bahmanyari <am...@yahoo.com.invalid>
> wrote:
>
> > Oops! sorry :-) Thanks Eugene ...
> >
> >      From: Eugene Kirpichov <ki...@google.com>
> >  To: dev@beam.incubator.apache.org; amir bahmanyari <amirtousa@yahoo.com
> >
> >  Sent: Friday, November 18, 2016 1:09 PM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > Amir - @Setup is a regular Java annotation; class names in Java
> (including
> > names of annotation classes), like all other names, are case-sensitive.
> >
> > On Fri, Nov 18, 2016 at 12:54 PM amir bahmanyari
> > <am...@yahoo.com.invalid> wrote:
> >
> > Thanks Alexey.I just fired up the whole thing. With @Setup. BTW, does it
> > matter if lowercase @setup or @Setup?I hope not. :-))Will update you
when
> > its done and share my observations.Cheers+have a great weekend.Amir-
> >
> >      From: Alexey Demin <di...@gmail.com>
> >  To: dev@beam.incubator.apache.org; amir bahmanyari <amirtousa@yahoo.com
> >
> >  Sent: Friday, November 18, 2016 12:38 PM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > In my case it's:
> > 1) i don't rebuild index by filters every time, only one time on start
> > processing
> > 2) connection for remote db does not open hundreds times in second
> >
> > as result all pipeline work more stable and faster
> >
> > 2016-11-19 0:06 GMT+04:00 amir bahmanyari <am...@yahoo.com.invalid>:
> >
> > > Hi Alexey,What improvements do you expect by replacing @StartBundle
> > > with @Setup?I am going to give it a try & see what diff it
> > > makes.Interesting & thanks for bringing it up...
> > > Cheers
> > >
> > >      From: Demin Alexey <di...@gmail.com>
> > >  To: dev@beam.incubator.apache.org
> > >  Sent: Friday, November 18, 2016 11:12 AM
> > >  Subject: Re: Flink runner. Wrapper for DoFn
> > >
> > > Oh, this is my mistake
> > >
> > > Yes correct way its use @Setup.
> > >
> > > Thank you Eugene.
> > >
> > >
> > > 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov
> <kirpichov@google.com.invalid
> > >
> > > :
> > >
> > > > Hi Alexey,
> > > >
> > > > In general, things like establishing connections and initializing
> > caches
> > > > are better done in @Setup and @TearDown methods, rather than
> > @StartBundle
> > > > and @FinishBundle, because DoFn's can be reused between bundles and
> > this
> > > > way you get more benefit from reuse.
> > > >
> > > > Bundles can be pretty small, especially in streaming pipelines. That
> > > said,
> > > > they normally shouldn't be 1-element-small. Hopefully someone
working
> > on
> > > > the Flink runner can comment.
> > > >
> > > > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > > > <am...@yahoo.com.invalid> wrote:
> > > >
> > > > > Hmmm...Thanks...This could very well be my bottleneck since I see
> > tons
> > > of
> > > > > threads get on WAIT state after sometime& stay like that
relatively
> > > > > forever.I have a 100 G worth of elements to process...........Is
> > there
> > > a
> > > > > way to bypass this "startBundle" & get a fairly optimized
> > > > > behavior?Anyone? Thanks+regardsAmir-
> > > > >
> > > > >      From: Demin Alexey <di...@gmail.com>
> > > > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> > > amirtousa@yahoo.com
> > > > >
> > > > >  Sent: Friday, November 18, 2016 10:40 AM
> > > > >  Subject: Re: Flink runner. Wrapper for DoFn
> > > > >
> > > > > Very simple example:
> > > > >
> > > > > My DoFn on startBundle load filters from remote db and build
> > optimized
> > > > > index, on processElement apply filters on every element for
> decision
> > > > about
> > > > > push element to next operation or drop his.
> > > > >
> > > > > In current implementation it's like matching regexp on string, you
> > > have 2
> > > > > way
> > > > > 1) compile regexp every time for every element
> > > > > 2) compile regexp one time and apply on all element
> > > > >
> > > > > now flink work by 1 way and this way not optimal
> > > > >
> > > > >
> > > > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari
> > <amirtousa@yahoo.com.invalid
> > > > >:
> > > > >
> > > > > > Hi Alexey," startBundle can be expensive"...Could you elaborate
> on
> > > > > > "expensive" as per each element pls?
> > > > > > Thanks
> > > > > >
> > > > > >      From: Demin Alexey <di...@gmail.com>
> > > > > >  To: dev@beam.incubator.apache.org
> > > > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > > > >  Subject: Flink runner. Wrapper for DoFn
> > > > > >
> > > > > > Hi
> > > > > >
> > > > > > In flink runner we have this code:
> > > > > >
> > > > > > https://github.com/apache/incubator-beam/blob/master/
> > > > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > > > >
> > > > > > but in mostly cases method startBundle can be expensive for
> making
> > > for
> > > > > > every element (for example connection for db/build cache/ etc)
> > > > > >
> > > > > > Why so important invoke startBundle/finishBundle on every
> > > > > > incoming streamRecord ?
> > > > > >
> > > > > > Thanks
> > > > > > Alexey Diomin
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > >
> >
> >
> >
> >
> >
> >
>
>
>

Re: Flink runner. Wrapper for DoFn

Posted by Alexey Demin <di...@gmail.com>.
Hi, Aljoscha

I returned with question about wrapper =)

kafka1 -> transformer1 -> kafka2

Load data from kafka1, split in 10+ events and push result in kafka2

processing use PushbackSideInputDoFnRunner and chaining

but Pushback use streaming wrapper DoFnOperator which invoke finishBundle
on every element and as result I have:

1) load from kafka
2) parsing and invoke context.collect(element1)
3) chaining to kafka
4) finishBundle kafka => kafka.flush()
5) context.collect(element2)
6) chaining to kafka
7) finishBundle kafka => kafka.flush()
...
etc

Do you have idea how I can prevent flush() on every element, because now
it's bottleneck for me?

Thanks,
Alexey Diomin


2016-11-19 11:59 GMT+04:00 Aljoscha Krettek <al...@apache.org>:

> @amir, what do you mean? Naming a ParDo "startBundle" is not the same thing
> as having a @StartBundle or startBundle() (for OldDoFn) method in your
> ParDo.
>
> On Sat, 19 Nov 2016 at 00:22 amir bahmanyari <am...@yahoo.com.invalid>
> wrote:
>
> > Interesting. I have been including "startBundle" in KafkaIO() thus
> > far.What could happen as far as Flink cluster performance in the
> > following?Thanks Aljoscha.
> > PCollection<KV<String, String>> kafkarecords = p
> > .apply(KafkaIO.read().withBootstrapServers("kafka01:
> 9092").withTopics(topics)
> > .withValueCoder(StringUtf8Coder.of()).withoutMetadata())
> > .apply("startBundle", ParDo.of( new DoFn<KV<byte[], String>, KV<String,
> > String>>() {
> > Amir-
> >
> >       From: Aljoscha Krettek <al...@apache.org>
> >  To: amir bahmanyari <am...@yahoo.com>; Eugene Kirpichov <
> > kirpichov@google.com>; "dev@beam.incubator.apache.org" <
> > dev@beam.incubator.apache.org>
> >  Sent: Friday, November 18, 2016 2:54 PM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > Regarding the Flink runner and how it calls startBundle()/finishBundle():
> > it's currently done like this because it is correct and because there is
> no
> > other "natural" point where it could be called. Flink continuously
> > processes elements and at some (user defined) interval performs
> checkpoints
> > to persist state. We could call startBundle()/finishBundle() when this
> > happens but I chose not to (for the time being) because this could lead
> to
> > problems if the user sets a rather large interval. Users can even disable
> > checkpointing, in which case we would never call these methods.
> >
> > --
> > Aljoscha
> >
> > On Fri, 18 Nov 2016 at 22:17 amir bahmanyari <amirtousa@yahoo.com.invalid
> >
> > wrote:
> >
> > > Oops! sorry :-) Thanks Eugene ...
> > >
> > >      From: Eugene Kirpichov <ki...@google.com>
> > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> amirtousa@yahoo.com
> > >
> > >  Sent: Friday, November 18, 2016 1:09 PM
> > >  Subject: Re: Flink runner. Wrapper for DoFn
> > >
> > > Amir - @Setup is a regular Java annotation; class names in Java
> > (including
> > > names of annotation classes), like all other names, are case-sensitive.
> > >
> > > On Fri, Nov 18, 2016 at 12:54 PM amir bahmanyari
> > > <am...@yahoo.com.invalid> wrote:
> > >
> > > Thanks Alexey.I just fired up the whole thing. With @Setup. BTW, does
> it
> > > matter if lowercase @setup or @Setup?I hope not. :-))Will update you
> when
> > > its done and share my observations.Cheers+have a great weekend.Amir-
> > >
> > >      From: Alexey Demin <di...@gmail.com>
> > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> amirtousa@yahoo.com
> > >
> > >  Sent: Friday, November 18, 2016 12:38 PM
> > >  Subject: Re: Flink runner. Wrapper for DoFn
> > >
> > > In my case it's:
> > > 1) i don't rebuild index by filters every time, only one time on start
> > > processing
> > > 2) connection for remote db does not open hundreds times in second
> > >
> > > as result all pipeline work more stable and faster
> > >
> > > 2016-11-19 0:06 GMT+04:00 amir bahmanyari <amirtousa@yahoo.com.invalid
> >:
> > >
> > > > Hi Alexey,What improvements do you expect by replacing @StartBundle
> > > > with @Setup?I am going to give it a try & see what diff it
> > > > makes.Interesting & thanks for bringing it up...
> > > > Cheers
> > > >
> > > >      From: Demin Alexey <di...@gmail.com>
> > > >  To: dev@beam.incubator.apache.org
> > > >  Sent: Friday, November 18, 2016 11:12 AM
> > > >  Subject: Re: Flink runner. Wrapper for DoFn
> > > >
> > > > Oh, this is my mistake
> > > >
> > > > Yes correct way its use @Setup.
> > > >
> > > > Thank you Eugene.
> > > >
> > > >
> > > > 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov
> > <kirpichov@google.com.invalid
> > > >
> > > > :
> > > >
> > > > > Hi Alexey,
> > > > >
> > > > > In general, things like establishing connections and initializing
> > > caches
> > > > > are better done in @Setup and @TearDown methods, rather than
> > > @StartBundle
> > > > > and @FinishBundle, because DoFn's can be reused between bundles and
> > > this
> > > > > way you get more benefit from reuse.
> > > > >
> > > > > Bundles can be pretty small, especially in streaming pipelines.
> That
> > > > said,
> > > > > they normally shouldn't be 1-element-small. Hopefully someone
> working
> > > on
> > > > > the Flink runner can comment.
> > > > >
> > > > > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > > > > <am...@yahoo.com.invalid> wrote:
> > > > >
> > > > > > Hmmm...Thanks...This could very well be my bottleneck since I see
> > > tons
> > > > of
> > > > > > threads get on WAIT state after sometime& stay like that
> relatively
> > > > > > forever.I have a 100 G worth of elements to process...........Is
> > > there
> > > > a
> > > > > > way to bypass this "startBundle" & get a fairly optimized
> > > > > > behavior?Anyone? Thanks+regardsAmir-
> > > > > >
> > > > > >      From: Demin Alexey <di...@gmail.com>
> > > > > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> > > > amirtousa@yahoo.com
> > > > > >
> > > > > >  Sent: Friday, November 18, 2016 10:40 AM
> > > > > >  Subject: Re: Flink runner. Wrapper for DoFn
> > > > > >
> > > > > > Very simple example:
> > > > > >
> > > > > > My DoFn on startBundle load filters from remote db and build
> > > optimized
> > > > > > index, on processElement apply filters on every element for
> > decision
> > > > > about
> > > > > > push element to next operation or drop his.
> > > > > >
> > > > > > In current implementation it's like matching regexp on string,
> you
> > > > have 2
> > > > > > way
> > > > > > 1) compile regexp every time for every element
> > > > > > 2) compile regexp one time and apply on all element
> > > > > >
> > > > > > now flink work by 1 way and this way not optimal
> > > > > >
> > > > > >
> > > > > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari
> > > <amirtousa@yahoo.com.invalid
> > > > > >:
> > > > > >
> > > > > > > Hi Alexey," startBundle can be expensive"...Could you elaborate
> > on
> > > > > > > "expensive" as per each element pls?
> > > > > > > Thanks
> > > > > > >
> > > > > > >      From: Demin Alexey <di...@gmail.com>
> > > > > > >  To: dev@beam.incubator.apache.org
> > > > > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > > > > >  Subject: Flink runner. Wrapper for DoFn
> > > > > > >
> > > > > > > Hi
> > > > > > >
> > > > > > > In flink runner we have this code:
> > > > > > >
> > > > > > > https://github.com/apache/incubator-beam/blob/master/
> > > > > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > > > > >
> > > > > > > but in mostly cases method startBundle can be expensive for
> > making
> > > > for
> > > > > > > every element (for example connection for db/build cache/ etc)
> > > > > > >
> > > > > > > Why so important invoke startBundle/finishBundle on every
> > > > > > > incoming streamRecord ?
> > > > > > >
> > > > > > > Thanks
> > > > > > > Alexey Diomin
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
> >
> >
>

Re: Flink runner. Wrapper for DoFn

Posted by Aljoscha Krettek <al...@apache.org>.
@amir, what do you mean? Naming a ParDo "startBundle" is not the same thing
as having a @StartBundle or startBundle() (for OldDoFn) method in your
ParDo.

On Sat, 19 Nov 2016 at 00:22 amir bahmanyari <am...@yahoo.com.invalid>
wrote:

> Interesting. I have been including "startBundle" in KafkaIO() thus
> far.What could happen as far as Flink cluster performance in the
> following?Thanks Aljoscha.
> PCollection<KV<String, String>> kafkarecords = p
> .apply(KafkaIO.read().withBootstrapServers("kafka01:9092").withTopics(topics)
> .withValueCoder(StringUtf8Coder.of()).withoutMetadata())
> .apply("startBundle", ParDo.of( new DoFn<KV<byte[], String>, KV<String,
> String>>() {
> Amir-
>
>       From: Aljoscha Krettek <al...@apache.org>
>  To: amir bahmanyari <am...@yahoo.com>; Eugene Kirpichov <
> kirpichov@google.com>; "dev@beam.incubator.apache.org" <
> dev@beam.incubator.apache.org>
>  Sent: Friday, November 18, 2016 2:54 PM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> Regarding the Flink runner and how it calls startBundle()/finishBundle():
> it's currently done like this because it is correct and because there is no
> other "natural" point where it could be called. Flink continuously
> processes elements and at some (user defined) interval performs checkpoints
> to persist state. We could call startBundle()/finishBundle() when this
> happens but I chose not to (for the time being) because this could lead to
> problems if the user sets a rather large interval. Users can even disable
> checkpointing, in which case we would never call these methods.
>
> --
> Aljoscha
>
> On Fri, 18 Nov 2016 at 22:17 amir bahmanyari <am...@yahoo.com.invalid>
> wrote:
>
> > Oops! sorry :-) Thanks Eugene ...
> >
> >      From: Eugene Kirpichov <ki...@google.com>
> >  To: dev@beam.incubator.apache.org; amir bahmanyari <amirtousa@yahoo.com
> >
> >  Sent: Friday, November 18, 2016 1:09 PM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > Amir - @Setup is a regular Java annotation; class names in Java
> (including
> > names of annotation classes), like all other names, are case-sensitive.
> >
> > On Fri, Nov 18, 2016 at 12:54 PM amir bahmanyari
> > <am...@yahoo.com.invalid> wrote:
> >
> > Thanks Alexey.I just fired up the whole thing. With @Setup. BTW, does it
> > matter if lowercase @setup or @Setup?I hope not. :-))Will update you when
> > its done and share my observations.Cheers+have a great weekend.Amir-
> >
> >      From: Alexey Demin <di...@gmail.com>
> >  To: dev@beam.incubator.apache.org; amir bahmanyari <amirtousa@yahoo.com
> >
> >  Sent: Friday, November 18, 2016 12:38 PM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > In my case it's:
> > 1) i don't rebuild index by filters every time, only one time on start
> > processing
> > 2) connection for remote db does not open hundreds times in second
> >
> > as result all pipeline work more stable and faster
> >
> > 2016-11-19 0:06 GMT+04:00 amir bahmanyari <am...@yahoo.com.invalid>:
> >
> > > Hi Alexey,What improvements do you expect by replacing @StartBundle
> > > with @Setup?I am going to give it a try & see what diff it
> > > makes.Interesting & thanks for bringing it up...
> > > Cheers
> > >
> > >      From: Demin Alexey <di...@gmail.com>
> > >  To: dev@beam.incubator.apache.org
> > >  Sent: Friday, November 18, 2016 11:12 AM
> > >  Subject: Re: Flink runner. Wrapper for DoFn
> > >
> > > Oh, this is my mistake
> > >
> > > Yes correct way its use @Setup.
> > >
> > > Thank you Eugene.
> > >
> > >
> > > 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov
> <kirpichov@google.com.invalid
> > >
> > > :
> > >
> > > > Hi Alexey,
> > > >
> > > > In general, things like establishing connections and initializing
> > caches
> > > > are better done in @Setup and @TearDown methods, rather than
> > @StartBundle
> > > > and @FinishBundle, because DoFn's can be reused between bundles and
> > this
> > > > way you get more benefit from reuse.
> > > >
> > > > Bundles can be pretty small, especially in streaming pipelines. That
> > > said,
> > > > they normally shouldn't be 1-element-small. Hopefully someone working
> > on
> > > > the Flink runner can comment.
> > > >
> > > > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > > > <am...@yahoo.com.invalid> wrote:
> > > >
> > > > > Hmmm...Thanks...This could very well be my bottleneck since I see
> > tons
> > > of
> > > > > threads get on WAIT state after sometime& stay like that relatively
> > > > > forever.I have a 100 G worth of elements to process...........Is
> > there
> > > a
> > > > > way to bypass this "startBundle" & get a fairly optimized
> > > > > behavior?Anyone? Thanks+regardsAmir-
> > > > >
> > > > >      From: Demin Alexey <di...@gmail.com>
> > > > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> > > amirtousa@yahoo.com
> > > > >
> > > > >  Sent: Friday, November 18, 2016 10:40 AM
> > > > >  Subject: Re: Flink runner. Wrapper for DoFn
> > > > >
> > > > > Very simple example:
> > > > >
> > > > > My DoFn on startBundle load filters from remote db and build
> > optimized
> > > > > index, on processElement apply filters on every element for
> decision
> > > > about
> > > > > push element to next operation or drop his.
> > > > >
> > > > > In current implementation it's like matching regexp on string, you
> > > have 2
> > > > > way
> > > > > 1) compile regexp every time for every element
> > > > > 2) compile regexp one time and apply on all element
> > > > >
> > > > > now flink work by 1 way and this way not optimal
> > > > >
> > > > >
> > > > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari
> > <amirtousa@yahoo.com.invalid
> > > > >:
> > > > >
> > > > > > Hi Alexey," startBundle can be expensive"...Could you elaborate
> on
> > > > > > "expensive" as per each element pls?
> > > > > > Thanks
> > > > > >
> > > > > >      From: Demin Alexey <di...@gmail.com>
> > > > > >  To: dev@beam.incubator.apache.org
> > > > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > > > >  Subject: Flink runner. Wrapper for DoFn
> > > > > >
> > > > > > Hi
> > > > > >
> > > > > > In flink runner we have this code:
> > > > > >
> > > > > > https://github.com/apache/incubator-beam/blob/master/
> > > > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > > > >
> > > > > > but in mostly cases method startBundle can be expensive for
> making
> > > for
> > > > > > every element (for example connection for db/build cache/ etc)
> > > > > >
> > > > > > Why so important invoke startBundle/finishBundle on every
> > > > > > incoming streamRecord ?
> > > > > >
> > > > > > Thanks
> > > > > > Alexey Diomin
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > >
> >
> >
> >
> >
> >
> >
>
>
>

Re: Flink runner. Wrapper for DoFn

Posted by amir bahmanyari <am...@yahoo.com.INVALID>.
Interesting. I have been including "startBundle" in KafkaIO() thus far.What could happen as far as Flink cluster performance in the following?Thanks Aljoscha.
PCollection<KV<String, String>> kafkarecords = p .apply(KafkaIO.read().withBootstrapServers("kafka01:9092").withTopics(topics) .withValueCoder(StringUtf8Coder.of()).withoutMetadata()) .apply("startBundle", ParDo.of( new DoFn<KV<byte[], String>, KV<String, String>>() { 
Amir-

      From: Aljoscha Krettek <al...@apache.org>
 To: amir bahmanyari <am...@yahoo.com>; Eugene Kirpichov <ki...@google.com>; "dev@beam.incubator.apache.org" <de...@beam.incubator.apache.org> 
 Sent: Friday, November 18, 2016 2:54 PM
 Subject: Re: Flink runner. Wrapper for DoFn
   
Regarding the Flink runner and how it calls startBundle()/finishBundle():
it's currently done like this because it is correct and because there is no
other "natural" point where it could be called. Flink continuously
processes elements and at some (user defined) interval performs checkpoints
to persist state. We could call startBundle()/finishBundle() when this
happens but I chose not to (for the time being) because this could lead to
problems if the user sets a rather large interval. Users can even disable
checkpointing, in which case we would never call these methods.

--
Aljoscha

On Fri, 18 Nov 2016 at 22:17 amir bahmanyari <am...@yahoo.com.invalid>
wrote:

> Oops! sorry :-) Thanks Eugene ...
>
>      From: Eugene Kirpichov <ki...@google.com>
>  To: dev@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>  Sent: Friday, November 18, 2016 1:09 PM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> Amir - @Setup is a regular Java annotation; class names in Java (including
> names of annotation classes), like all other names, are case-sensitive.
>
> On Fri, Nov 18, 2016 at 12:54 PM amir bahmanyari
> <am...@yahoo.com.invalid> wrote:
>
> Thanks Alexey.I just fired up the whole thing. With @Setup. BTW, does it
> matter if lowercase @setup or @Setup?I hope not. :-))Will update you when
> its done and share my observations.Cheers+have a great weekend.Amir-
>
>      From: Alexey Demin <di...@gmail.com>
>  To: dev@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>  Sent: Friday, November 18, 2016 12:38 PM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> In my case it's:
> 1) i don't rebuild index by filters every time, only one time on start
> processing
> 2) connection for remote db does not open hundreds times in second
>
> as result all pipeline work more stable and faster
>
> 2016-11-19 0:06 GMT+04:00 amir bahmanyari <am...@yahoo.com.invalid>:
>
> > Hi Alexey,What improvements do you expect by replacing @StartBundle
> > with @Setup?I am going to give it a try & see what diff it
> > makes.Interesting & thanks for bringing it up...
> > Cheers
> >
> >      From: Demin Alexey <di...@gmail.com>
> >  To: dev@beam.incubator.apache.org
> >  Sent: Friday, November 18, 2016 11:12 AM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > Oh, this is my mistake
> >
> > Yes correct way its use @Setup.
> >
> > Thank you Eugene.
> >
> >
> > 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov <kirpichov@google.com.invalid
> >
> > :
> >
> > > Hi Alexey,
> > >
> > > In general, things like establishing connections and initializing
> caches
> > > are better done in @Setup and @TearDown methods, rather than
> @StartBundle
> > > and @FinishBundle, because DoFn's can be reused between bundles and
> this
> > > way you get more benefit from reuse.
> > >
> > > Bundles can be pretty small, especially in streaming pipelines. That
> > said,
> > > they normally shouldn't be 1-element-small. Hopefully someone working
> on
> > > the Flink runner can comment.
> > >
> > > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > > <am...@yahoo.com.invalid> wrote:
> > >
> > > > Hmmm...Thanks...This could very well be my bottleneck since I see
> tons
> > of
> > > > threads get on WAIT state after sometime& stay like that relatively
> > > > forever.I have a 100 G worth of elements to process...........Is
> there
> > a
> > > > way to bypass this "startBundle" & get a fairly optimized
> > > > behavior?Anyone? Thanks+regardsAmir-
> > > >
> > > >      From: Demin Alexey <di...@gmail.com>
> > > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> > amirtousa@yahoo.com
> > > >
> > > >  Sent: Friday, November 18, 2016 10:40 AM
> > > >  Subject: Re: Flink runner. Wrapper for DoFn
> > > >
> > > > Very simple example:
> > > >
> > > > My DoFn on startBundle load filters from remote db and build
> optimized
> > > > index, on processElement apply filters on every element for decision
> > > about
> > > > push element to next operation or drop his.
> > > >
> > > > In current implementation it's like matching regexp on string, you
> > have 2
> > > > way
> > > > 1) compile regexp every time for every element
> > > > 2) compile regexp one time and apply on all element
> > > >
> > > > now flink work by 1 way and this way not optimal
> > > >
> > > >
> > > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari
> <amirtousa@yahoo.com.invalid
> > > >:
> > > >
> > > > > Hi Alexey," startBundle can be expensive"...Could you elaborate on
> > > > > "expensive" as per each element pls?
> > > > > Thanks
> > > > >
> > > > >      From: Demin Alexey <di...@gmail.com>
> > > > >  To: dev@beam.incubator.apache.org
> > > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > > >  Subject: Flink runner. Wrapper for DoFn
> > > > >
> > > > > Hi
> > > > >
> > > > > In flink runner we have this code:
> > > > >
> > > > > https://github.com/apache/incubator-beam/blob/master/
> > > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > > >
> > > > > but in mostly cases method startBundle can be expensive for making
> > for
> > > > > every element (for example connection for db/build cache/ etc)
> > > > >
> > > > > Why so important invoke startBundle/finishBundle on every
> > > > > incoming streamRecord ?
> > > > >
> > > > > Thanks
> > > > > Alexey Diomin
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > >
> > >
> >
> >
> >
> >
>
>
>
>
>
>


   

Re: Flink runner. Wrapper for DoFn

Posted by Aljoscha Krettek <al...@apache.org>.
Regarding the Flink runner and how it calls startBundle()/finishBundle():
it's currently done like this because it is correct and because there is no
other "natural" point where it could be called. Flink continuously
processes elements and at some (user defined) interval performs checkpoints
to persist state. We could call startBundle()/finishBundle() when this
happens but I chose not to (for the time being) because this could lead to
problems if the user sets a rather large interval. Users can even disable
checkpointing, in which case we would never call these methods.

--
Aljoscha

On Fri, 18 Nov 2016 at 22:17 amir bahmanyari <am...@yahoo.com.invalid>
wrote:

> Oops! sorry :-) Thanks Eugene ...
>
>       From: Eugene Kirpichov <ki...@google.com>
>  To: dev@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>  Sent: Friday, November 18, 2016 1:09 PM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> Amir - @Setup is a regular Java annotation; class names in Java (including
> names of annotation classes), like all other names, are case-sensitive.
>
> On Fri, Nov 18, 2016 at 12:54 PM amir bahmanyari
> <am...@yahoo.com.invalid> wrote:
>
> Thanks Alexey.I just fired up the whole thing. With @Setup. BTW, does it
> matter if lowercase @setup or @Setup?I hope not. :-))Will update you when
> its done and share my observations.Cheers+have a great weekend.Amir-
>
>       From: Alexey Demin <di...@gmail.com>
>  To: dev@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>  Sent: Friday, November 18, 2016 12:38 PM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> In my case it's:
> 1) i don't rebuild index by filters every time, only one time on start
> processing
> 2) connection for remote db does not open hundreds times in second
>
> as result all pipeline work more stable and faster
>
> 2016-11-19 0:06 GMT+04:00 amir bahmanyari <am...@yahoo.com.invalid>:
>
> > Hi Alexey,What improvements do you expect by replacing @StartBundle
> > with @Setup?I am going to give it a try & see what diff it
> > makes.Interesting & thanks for bringing it up...
> > Cheers
> >
> >      From: Demin Alexey <di...@gmail.com>
> >  To: dev@beam.incubator.apache.org
> >  Sent: Friday, November 18, 2016 11:12 AM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > Oh, this is my mistake
> >
> > Yes correct way its use @Setup.
> >
> > Thank you Eugene.
> >
> >
> > 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov <kirpichov@google.com.invalid
> >
> > :
> >
> > > Hi Alexey,
> > >
> > > In general, things like establishing connections and initializing
> caches
> > > are better done in @Setup and @TearDown methods, rather than
> @StartBundle
> > > and @FinishBundle, because DoFn's can be reused between bundles and
> this
> > > way you get more benefit from reuse.
> > >
> > > Bundles can be pretty small, especially in streaming pipelines. That
> > said,
> > > they normally shouldn't be 1-element-small. Hopefully someone working
> on
> > > the Flink runner can comment.
> > >
> > > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > > <am...@yahoo.com.invalid> wrote:
> > >
> > > > Hmmm...Thanks...This could very well be my bottleneck since I see
> tons
> > of
> > > > threads get on WAIT state after sometime& stay like that relatively
> > > > forever.I have a 100 G worth of elements to process...........Is
> there
> > a
> > > > way to bypass this "startBundle" & get a fairly optimized
> > > > behavior?Anyone? Thanks+regardsAmir-
> > > >
> > > >      From: Demin Alexey <di...@gmail.com>
> > > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> > amirtousa@yahoo.com
> > > >
> > > >  Sent: Friday, November 18, 2016 10:40 AM
> > > >  Subject: Re: Flink runner. Wrapper for DoFn
> > > >
> > > > Very simple example:
> > > >
> > > > My DoFn on startBundle load filters from remote db and build
> optimized
> > > > index, on processElement apply filters on every element for decision
> > > about
> > > > push element to next operation or drop his.
> > > >
> > > > In current implementation it's like matching regexp on string, you
> > have 2
> > > > way
> > > > 1) compile regexp every time for every element
> > > > 2) compile regexp one time and apply on all element
> > > >
> > > > now flink work by 1 way and this way not optimal
> > > >
> > > >
> > > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari
> <amirtousa@yahoo.com.invalid
> > > >:
> > > >
> > > > > Hi Alexey," startBundle can be expensive"...Could you elaborate on
> > > > > "expensive" as per each element pls?
> > > > > Thanks
> > > > >
> > > > >      From: Demin Alexey <di...@gmail.com>
> > > > >  To: dev@beam.incubator.apache.org
> > > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > > >  Subject: Flink runner. Wrapper for DoFn
> > > > >
> > > > > Hi
> > > > >
> > > > > In flink runner we have this code:
> > > > >
> > > > > https://github.com/apache/incubator-beam/blob/master/
> > > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > > >
> > > > > but in mostly cases method startBundle can be expensive for making
> > for
> > > > > every element (for example connection for db/build cache/ etc)
> > > > >
> > > > > Why so important invoke startBundle/finishBundle on every
> > > > > incoming streamRecord ?
> > > > >
> > > > > Thanks
> > > > > Alexey Diomin
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > >
> > >
> >
> >
> >
> >
>
>
>
>
>
>

Re: Flink runner. Wrapper for DoFn

Posted by amir bahmanyari <am...@yahoo.com.INVALID>.
Oops! sorry :-) Thanks Eugene ...

      From: Eugene Kirpichov <ki...@google.com>
 To: dev@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Friday, November 18, 2016 1:09 PM
 Subject: Re: Flink runner. Wrapper for DoFn
   
Amir - @Setup is a regular Java annotation; class names in Java (including names of annotation classes), like all other names, are case-sensitive.

On Fri, Nov 18, 2016 at 12:54 PM amir bahmanyari <am...@yahoo.com.invalid> wrote:

Thanks Alexey.I just fired up the whole thing. With @Setup. BTW, does it matter if lowercase @setup or @Setup?I hope not. :-))Will update you when its done and share my observations.Cheers+have a great weekend.Amir-

      From: Alexey Demin <di...@gmail.com>
 To: dev@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
 Sent: Friday, November 18, 2016 12:38 PM
 Subject: Re: Flink runner. Wrapper for DoFn

In my case it's:
1) i don't rebuild index by filters every time, only one time on start
processing
2) connection for remote db does not open hundreds times in second

as result all pipeline work more stable and faster

2016-11-19 0:06 GMT+04:00 amir bahmanyari <am...@yahoo.com.invalid>:

> Hi Alexey,What improvements do you expect by replacing @StartBundle
> with @Setup?I am going to give it a try & see what diff it
> makes.Interesting & thanks for bringing it up...
> Cheers
>
>      From: Demin Alexey <di...@gmail.com>
>  To: dev@beam.incubator.apache.org
>  Sent: Friday, November 18, 2016 11:12 AM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> Oh, this is my mistake
>
> Yes correct way its use @Setup.
>
> Thank you Eugene.
>
>
> 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov <ki...@google.com.invalid>
> :
>
> > Hi Alexey,
> >
> > In general, things like establishing connections and initializing caches
> > are better done in @Setup and @TearDown methods, rather than @StartBundle
> > and @FinishBundle, because DoFn's can be reused between bundles and this
> > way you get more benefit from reuse.
> >
> > Bundles can be pretty small, especially in streaming pipelines. That
> said,
> > they normally shouldn't be 1-element-small. Hopefully someone working on
> > the Flink runner can comment.
> >
> > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > <am...@yahoo.com.invalid> wrote:
> >
> > > Hmmm...Thanks...This could very well be my bottleneck since I see tons
> of
> > > threads get on WAIT state after sometime& stay like that relatively
> > > forever.I have a 100 G worth of elements to process...........Is there
> a
> > > way to bypass this "startBundle" & get a fairly optimized
> > > behavior?Anyone? Thanks+regardsAmir-
> > >
> > >      From: Demin Alexey <di...@gmail.com>
> > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> amirtousa@yahoo.com
> > >
> > >  Sent: Friday, November 18, 2016 10:40 AM
> > >  Subject: Re: Flink runner. Wrapper for DoFn
> > >
> > > Very simple example:
> > >
> > > My DoFn on startBundle load filters from remote db and build optimized
> > > index, on processElement apply filters on every element for decision
> > about
> > > push element to next operation or drop his.
> > >
> > > In current implementation it's like matching regexp on string, you
> have 2
> > > way
> > > 1) compile regexp every time for every element
> > > 2) compile regexp one time and apply on all element
> > >
> > > now flink work by 1 way and this way not optimal
> > >
> > >
> > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari <amirtousa@yahoo.com.invalid
> > >:
> > >
> > > > Hi Alexey," startBundle can be expensive"...Could you elaborate on
> > > > "expensive" as per each element pls?
> > > > Thanks
> > > >
> > > >      From: Demin Alexey <di...@gmail.com>
> > > >  To: dev@beam.incubator.apache.org
> > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > >  Subject: Flink runner. Wrapper for DoFn
> > > >
> > > > Hi
> > > >
> > > > In flink runner we have this code:
> > > >
> > > > https://github.com/apache/incubator-beam/blob/master/
> > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > >
> > > > but in mostly cases method startBundle can be expensive for making
> for
> > > > every element (for example connection for db/build cache/ etc)
> > > >
> > > > Why so important invoke startBundle/finishBundle on every
> > > > incoming streamRecord ?
> > > >
> > > > Thanks
> > > > Alexey Diomin
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > >
> >
>
>
>
>


   


   

Re: Flink runner. Wrapper for DoFn

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
Amir - @Setup is a regular Java annotation
<http://docs.oracle.com/javase/1.5.0/docs/guide/language/annotations.html>;
class names in Java (including names of annotation classes), like all other
names, are case-sensitive.

On Fri, Nov 18, 2016 at 12:54 PM amir bahmanyari
<am...@yahoo.com.invalid> wrote:

> Thanks Alexey.I just fired up the whole thing. With @Setup. BTW, does it
> matter if lowercase @setup or @Setup?I hope not. :-))Will update you when
> its done and share my observations.Cheers+have a great weekend.Amir-
>
>       From: Alexey Demin <di...@gmail.com>
>  To: dev@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>  Sent: Friday, November 18, 2016 12:38 PM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> In my case it's:
> 1) i don't rebuild index by filters every time, only one time on start
> processing
> 2) connection for remote db does not open hundreds times in second
>
> as result all pipeline work more stable and faster
>
> 2016-11-19 0:06 GMT+04:00 amir bahmanyari <am...@yahoo.com.invalid>:
>
> > Hi Alexey,What improvements do you expect by replacing @StartBundle
> > with @Setup?I am going to give it a try & see what diff it
> > makes.Interesting & thanks for bringing it up...
> > Cheers
> >
> >      From: Demin Alexey <di...@gmail.com>
> >  To: dev@beam.incubator.apache.org
> >  Sent: Friday, November 18, 2016 11:12 AM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > Oh, this is my mistake
> >
> > Yes correct way its use @Setup.
> >
> > Thank you Eugene.
> >
> >
> > 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov <kirpichov@google.com.invalid
> >
> > :
> >
> > > Hi Alexey,
> > >
> > > In general, things like establishing connections and initializing
> caches
> > > are better done in @Setup and @TearDown methods, rather than
> @StartBundle
> > > and @FinishBundle, because DoFn's can be reused between bundles and
> this
> > > way you get more benefit from reuse.
> > >
> > > Bundles can be pretty small, especially in streaming pipelines. That
> > said,
> > > they normally shouldn't be 1-element-small. Hopefully someone working
> on
> > > the Flink runner can comment.
> > >
> > > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > > <am...@yahoo.com.invalid> wrote:
> > >
> > > > Hmmm...Thanks...This could very well be my bottleneck since I see
> tons
> > of
> > > > threads get on WAIT state after sometime& stay like that relatively
> > > > forever.I have a 100 G worth of elements to process...........Is
> there
> > a
> > > > way to bypass this "startBundle" & get a fairly optimized
> > > > behavior?Anyone? Thanks+regardsAmir-
> > > >
> > > >      From: Demin Alexey <di...@gmail.com>
> > > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> > amirtousa@yahoo.com
> > > >
> > > >  Sent: Friday, November 18, 2016 10:40 AM
> > > >  Subject: Re: Flink runner. Wrapper for DoFn
> > > >
> > > > Very simple example:
> > > >
> > > > My DoFn on startBundle load filters from remote db and build
> optimized
> > > > index, on processElement apply filters on every element for decision
> > > about
> > > > push element to next operation or drop his.
> > > >
> > > > In current implementation it's like matching regexp on string, you
> > have 2
> > > > way
> > > > 1) compile regexp every time for every element
> > > > 2) compile regexp one time and apply on all element
> > > >
> > > > now flink work by 1 way and this way not optimal
> > > >
> > > >
> > > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari
> <amirtousa@yahoo.com.invalid
> > > >:
> > > >
> > > > > Hi Alexey," startBundle can be expensive"...Could you elaborate on
> > > > > "expensive" as per each element pls?
> > > > > Thanks
> > > > >
> > > > >      From: Demin Alexey <di...@gmail.com>
> > > > >  To: dev@beam.incubator.apache.org
> > > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > > >  Subject: Flink runner. Wrapper for DoFn
> > > > >
> > > > > Hi
> > > > >
> > > > > In flink runner we have this code:
> > > > >
> > > > > https://github.com/apache/incubator-beam/blob/master/
> > > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > > >
> > > > > but in mostly cases method startBundle can be expensive for making
> > for
> > > > > every element (for example connection for db/build cache/ etc)
> > > > >
> > > > > Why so important invoke startBundle/finishBundle on every
> > > > > incoming streamRecord ?
> > > > >
> > > > > Thanks
> > > > > Alexey Diomin
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > >
> > >
> >
> >
> >
> >
>
>
>

Re: Flink runner. Wrapper for DoFn

Posted by amir bahmanyari <am...@yahoo.com.INVALID>.
Thanks Alexey.I just fired up the whole thing. With @Setup. BTW, does it matter if lowercase @setup or @Setup?I hope not. :-))Will update you when its done and share my observations.Cheers+have a great weekend.Amir-

      From: Alexey Demin <di...@gmail.com>
 To: dev@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Friday, November 18, 2016 12:38 PM
 Subject: Re: Flink runner. Wrapper for DoFn
   
In my case it's:
1) i don't rebuild index by filters every time, only one time on start
processing
2) connection for remote db does not open hundreds times in second

as result all pipeline work more stable and faster

2016-11-19 0:06 GMT+04:00 amir bahmanyari <am...@yahoo.com.invalid>:

> Hi Alexey,What improvements do you expect by replacing @StartBundle
> with @Setup?I am going to give it a try & see what diff it
> makes.Interesting & thanks for bringing it up...
> Cheers
>
>      From: Demin Alexey <di...@gmail.com>
>  To: dev@beam.incubator.apache.org
>  Sent: Friday, November 18, 2016 11:12 AM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> Oh, this is my mistake
>
> Yes correct way its use @Setup.
>
> Thank you Eugene.
>
>
> 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov <ki...@google.com.invalid>
> :
>
> > Hi Alexey,
> >
> > In general, things like establishing connections and initializing caches
> > are better done in @Setup and @TearDown methods, rather than @StartBundle
> > and @FinishBundle, because DoFn's can be reused between bundles and this
> > way you get more benefit from reuse.
> >
> > Bundles can be pretty small, especially in streaming pipelines. That
> said,
> > they normally shouldn't be 1-element-small. Hopefully someone working on
> > the Flink runner can comment.
> >
> > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > <am...@yahoo.com.invalid> wrote:
> >
> > > Hmmm...Thanks...This could very well be my bottleneck since I see tons
> of
> > > threads get on WAIT state after sometime& stay like that relatively
> > > forever.I have a 100 G worth of elements to process...........Is there
> a
> > > way to bypass this "startBundle" & get a fairly optimized
> > > behavior?Anyone? Thanks+regardsAmir-
> > >
> > >      From: Demin Alexey <di...@gmail.com>
> > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> amirtousa@yahoo.com
> > >
> > >  Sent: Friday, November 18, 2016 10:40 AM
> > >  Subject: Re: Flink runner. Wrapper for DoFn
> > >
> > > Very simple example:
> > >
> > > My DoFn on startBundle load filters from remote db and build optimized
> > > index, on processElement apply filters on every element for decision
> > about
> > > push element to next operation or drop his.
> > >
> > > In current implementation it's like matching regexp on string, you
> have 2
> > > way
> > > 1) compile regexp every time for every element
> > > 2) compile regexp one time and apply on all element
> > >
> > > now flink work by 1 way and this way not optimal
> > >
> > >
> > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari <amirtousa@yahoo.com.invalid
> > >:
> > >
> > > > Hi Alexey," startBundle can be expensive"...Could you elaborate on
> > > > "expensive" as per each element pls?
> > > > Thanks
> > > >
> > > >      From: Demin Alexey <di...@gmail.com>
> > > >  To: dev@beam.incubator.apache.org
> > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > >  Subject: Flink runner. Wrapper for DoFn
> > > >
> > > > Hi
> > > >
> > > > In flink runner we have this code:
> > > >
> > > > https://github.com/apache/incubator-beam/blob/master/
> > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > >
> > > > but in mostly cases method startBundle can be expensive for making
> for
> > > > every element (for example connection for db/build cache/ etc)
> > > >
> > > > Why so important invoke startBundle/finishBundle on every
> > > > incoming streamRecord ?
> > > >
> > > > Thanks
> > > > Alexey Diomin
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > >
> >
>
>
>
>


   

Re: Flink runner. Wrapper for DoFn

Posted by Alexey Demin <di...@gmail.com>.
In my case it's:
1) i don't rebuild index by filters every time, only one time on start
processing
2) connection for remote db does not open hundreds times in second

as result all pipeline work more stable and faster

2016-11-19 0:06 GMT+04:00 amir bahmanyari <am...@yahoo.com.invalid>:

> Hi Alexey,What improvements do you expect by replacing @StartBundle
> with @Setup?I am going to give it a try & see what diff it
> makes.Interesting & thanks for bringing it up...
> Cheers
>
>       From: Demin Alexey <di...@gmail.com>
>  To: dev@beam.incubator.apache.org
>  Sent: Friday, November 18, 2016 11:12 AM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> Oh, this is my mistake
>
> Yes correct way its use @Setup.
>
> Thank you Eugene.
>
>
> 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov <ki...@google.com.invalid>
> :
>
> > Hi Alexey,
> >
> > In general, things like establishing connections and initializing caches
> > are better done in @Setup and @TearDown methods, rather than @StartBundle
> > and @FinishBundle, because DoFn's can be reused between bundles and this
> > way you get more benefit from reuse.
> >
> > Bundles can be pretty small, especially in streaming pipelines. That
> said,
> > they normally shouldn't be 1-element-small. Hopefully someone working on
> > the Flink runner can comment.
> >
> > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > <am...@yahoo.com.invalid> wrote:
> >
> > > Hmmm...Thanks...This could very well be my bottleneck since I see tons
> of
> > > threads get on WAIT state after sometime& stay like that relatively
> > > forever.I have a 100 G worth of elements to process...........Is there
> a
> > > way to bypass this "startBundle" & get a fairly optimized
> > > behavior?Anyone? Thanks+regardsAmir-
> > >
> > >      From: Demin Alexey <di...@gmail.com>
> > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> amirtousa@yahoo.com
> > >
> > >  Sent: Friday, November 18, 2016 10:40 AM
> > >  Subject: Re: Flink runner. Wrapper for DoFn
> > >
> > > Very simple example:
> > >
> > > My DoFn on startBundle load filters from remote db and build optimized
> > > index, on processElement apply filters on every element for decision
> > about
> > > push element to next operation or drop his.
> > >
> > > In current implementation it's like matching regexp on string, you
> have 2
> > > way
> > > 1) compile regexp every time for every element
> > > 2) compile regexp one time and apply on all element
> > >
> > > now flink work by 1 way and this way not optimal
> > >
> > >
> > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari <amirtousa@yahoo.com.invalid
> > >:
> > >
> > > > Hi Alexey," startBundle can be expensive"...Could you elaborate on
> > > > "expensive" as per each element pls?
> > > > Thanks
> > > >
> > > >      From: Demin Alexey <di...@gmail.com>
> > > >  To: dev@beam.incubator.apache.org
> > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > >  Subject: Flink runner. Wrapper for DoFn
> > > >
> > > > Hi
> > > >
> > > > In flink runner we have this code:
> > > >
> > > > https://github.com/apache/incubator-beam/blob/master/
> > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > >
> > > > but in mostly cases method startBundle can be expensive for making
> for
> > > > every element (for example connection for db/build cache/ etc)
> > > >
> > > > Why so important invoke startBundle/finishBundle on every
> > > > incoming streamRecord ?
> > > >
> > > > Thanks
> > > > Alexey Diomin
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > >
> >
>
>
>
>

Re: Flink runner. Wrapper for DoFn

Posted by amir bahmanyari <am...@yahoo.com.INVALID>.
Hi Alexey,What improvements do you expect by replacing @StartBundle with @Setup?I am going to give it a try & see what diff it makes.Interesting & thanks for bringing it up...
Cheers

      From: Demin Alexey <di...@gmail.com>
 To: dev@beam.incubator.apache.org 
 Sent: Friday, November 18, 2016 11:12 AM
 Subject: Re: Flink runner. Wrapper for DoFn
   
Oh, this is my mistake

Yes correct way its use @Setup.

Thank you Eugene.


2016-11-18 22:54 GMT+04:00 Eugene Kirpichov <ki...@google.com.invalid>:

> Hi Alexey,
>
> In general, things like establishing connections and initializing caches
> are better done in @Setup and @TearDown methods, rather than @StartBundle
> and @FinishBundle, because DoFn's can be reused between bundles and this
> way you get more benefit from reuse.
>
> Bundles can be pretty small, especially in streaming pipelines. That said,
> they normally shouldn't be 1-element-small. Hopefully someone working on
> the Flink runner can comment.
>
> On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> <am...@yahoo.com.invalid> wrote:
>
> > Hmmm...Thanks...This could very well be my bottleneck since I see tons of
> > threads get on WAIT state after sometime& stay like that relatively
> > forever.I have a 100 G worth of elements to process...........Is there a
> > way to bypass this "startBundle" & get a fairly optimized
> > behavior?Anyone? Thanks+regardsAmir-
> >
> >      From: Demin Alexey <di...@gmail.com>
> >  To: dev@beam.incubator.apache.org; amir bahmanyari <amirtousa@yahoo.com
> >
> >  Sent: Friday, November 18, 2016 10:40 AM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > Very simple example:
> >
> > My DoFn on startBundle load filters from remote db and build optimized
> > index, on processElement apply filters on every element for decision
> about
> > push element to next operation or drop his.
> >
> > In current implementation it's like matching regexp on string, you have 2
> > way
> > 1) compile regexp every time for every element
> > 2) compile regexp one time and apply on all element
> >
> > now flink work by 1 way and this way not optimal
> >
> >
> > 2016-11-18 22:26 GMT+04:00 amir bahmanyari <amirtousa@yahoo.com.invalid
> >:
> >
> > > Hi Alexey," startBundle can be expensive"...Could you elaborate on
> > > "expensive" as per each element pls?
> > > Thanks
> > >
> > >      From: Demin Alexey <di...@gmail.com>
> > >  To: dev@beam.incubator.apache.org
> > >  Sent: Friday, November 18, 2016 7:40 AM
> > >  Subject: Flink runner. Wrapper for DoFn
> > >
> > > Hi
> > >
> > > In flink runner we have this code:
> > >
> > > https://github.com/apache/incubator-beam/blob/master/
> > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > >
> > > but in mostly cases method startBundle can be expensive for making for
> > > every element (for example connection for db/build cache/ etc)
> > >
> > > Why so important invoke startBundle/finishBundle on every
> > > incoming streamRecord ?
> > >
> > > Thanks
> > > Alexey Diomin
> > >
> > >
> > >
> > >
> >
> >
> >
>


   

Re: Flink runner. Wrapper for DoFn

Posted by Thomas Groh <tg...@google.com.INVALID>.
I'm going to also comment on why you would Start/FinishBundle or
Setup/Teardown. Generally, StartBundle/FinishBundle is for processing
behavior and correctness, while Setup/Teardown are about managing
persistent resources (like a connection in your case).

To be specific, FinishBundle must be called before any work is committed,
and you must flush any state based on the elements processed within the
bundle. This ensures that when the input bundle is checkpointed, any state
is committed and operations that are performed based on the input elements
have been persisted. If this is not done and an input is checkpointed, no
DoFn will see the input elements again - which means if a worker crashes,
or Teardown is not called for any reason, the state generated by those
elements will be permanently lost.

KafkaIO is an excellent example of both of these patterns (
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1332
)

In @Setup, we construct a producer, which we can reuse for the life of the
Fn
In @ProcessElement, we write a sequence of elements to the producer
In @FinishBundle, we flush the producer, clearing out any state. If the
input elements are never seen again, the elements will still be reflected
in Kafka
In @Teardown, we close the producer and free any associated resources.

On Fri, Nov 18, 2016 at 11:12 AM, Demin Alexey <di...@gmail.com> wrote:

> Oh, this is my mistake
>
> Yes correct way its use @Setup.
>
> Thank you Eugene.
>
>
> 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov <ki...@google.com.invalid>
> :
>
> > Hi Alexey,
> >
> > In general, things like establishing connections and initializing caches
> > are better done in @Setup and @TearDown methods, rather than @StartBundle
> > and @FinishBundle, because DoFn's can be reused between bundles and this
> > way you get more benefit from reuse.
> >
> > Bundles can be pretty small, especially in streaming pipelines. That
> said,
> > they normally shouldn't be 1-element-small. Hopefully someone working on
> > the Flink runner can comment.
> >
> > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> > <am...@yahoo.com.invalid> wrote:
> >
> > > Hmmm...Thanks...This could very well be my bottleneck since I see tons
> of
> > > threads get on WAIT state after sometime& stay like that relatively
> > > forever.I have a 100 G worth of elements to process...........Is there
> a
> > > way to bypass this "startBundle" & get a fairly optimized
> > > behavior?Anyone? Thanks+regardsAmir-
> > >
> > >       From: Demin Alexey <di...@gmail.com>
> > >  To: dev@beam.incubator.apache.org; amir bahmanyari <
> amirtousa@yahoo.com
> > >
> > >  Sent: Friday, November 18, 2016 10:40 AM
> > >  Subject: Re: Flink runner. Wrapper for DoFn
> > >
> > > Very simple example:
> > >
> > > My DoFn on startBundle load filters from remote db and build optimized
> > > index, on processElement apply filters on every element for decision
> > about
> > > push element to next operation or drop his.
> > >
> > > In current implementation it's like matching regexp on string, you
> have 2
> > > way
> > > 1) compile regexp every time for every element
> > > 2) compile regexp one time and apply on all element
> > >
> > > now flink work by 1 way and this way not optimal
> > >
> > >
> > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari <amirtousa@yahoo.com.invalid
> > >:
> > >
> > > > Hi Alexey," startBundle can be expensive"...Could you elaborate on
> > > > "expensive" as per each element pls?
> > > > Thanks
> > > >
> > > >      From: Demin Alexey <di...@gmail.com>
> > > >  To: dev@beam.incubator.apache.org
> > > >  Sent: Friday, November 18, 2016 7:40 AM
> > > >  Subject: Flink runner. Wrapper for DoFn
> > > >
> > > > Hi
> > > >
> > > > In flink runner we have this code:
> > > >
> > > > https://github.com/apache/incubator-beam/blob/master/
> > > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > > >
> > > > but in mostly cases method startBundle can be expensive for making
> for
> > > > every element (for example connection for db/build cache/ etc)
> > > >
> > > > Why so important invoke startBundle/finishBundle on every
> > > > incoming streamRecord ?
> > > >
> > > > Thanks
> > > > Alexey Diomin
> > > >
> > > >
> > > >
> > > >
> > >
> > >
> > >
> >
>

Re: Flink runner. Wrapper for DoFn

Posted by Demin Alexey <di...@gmail.com>.
Oh, this is my mistake

Yes correct way its use @Setup.

Thank you Eugene.


2016-11-18 22:54 GMT+04:00 Eugene Kirpichov <ki...@google.com.invalid>:

> Hi Alexey,
>
> In general, things like establishing connections and initializing caches
> are better done in @Setup and @TearDown methods, rather than @StartBundle
> and @FinishBundle, because DoFn's can be reused between bundles and this
> way you get more benefit from reuse.
>
> Bundles can be pretty small, especially in streaming pipelines. That said,
> they normally shouldn't be 1-element-small. Hopefully someone working on
> the Flink runner can comment.
>
> On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
> <am...@yahoo.com.invalid> wrote:
>
> > Hmmm...Thanks...This could very well be my bottleneck since I see tons of
> > threads get on WAIT state after sometime& stay like that relatively
> > forever.I have a 100 G worth of elements to process...........Is there a
> > way to bypass this "startBundle" & get a fairly optimized
> > behavior?Anyone? Thanks+regardsAmir-
> >
> >       From: Demin Alexey <di...@gmail.com>
> >  To: dev@beam.incubator.apache.org; amir bahmanyari <amirtousa@yahoo.com
> >
> >  Sent: Friday, November 18, 2016 10:40 AM
> >  Subject: Re: Flink runner. Wrapper for DoFn
> >
> > Very simple example:
> >
> > My DoFn on startBundle load filters from remote db and build optimized
> > index, on processElement apply filters on every element for decision
> about
> > push element to next operation or drop his.
> >
> > In current implementation it's like matching regexp on string, you have 2
> > way
> > 1) compile regexp every time for every element
> > 2) compile regexp one time and apply on all element
> >
> > now flink work by 1 way and this way not optimal
> >
> >
> > 2016-11-18 22:26 GMT+04:00 amir bahmanyari <amirtousa@yahoo.com.invalid
> >:
> >
> > > Hi Alexey," startBundle can be expensive"...Could you elaborate on
> > > "expensive" as per each element pls?
> > > Thanks
> > >
> > >      From: Demin Alexey <di...@gmail.com>
> > >  To: dev@beam.incubator.apache.org
> > >  Sent: Friday, November 18, 2016 7:40 AM
> > >  Subject: Flink runner. Wrapper for DoFn
> > >
> > > Hi
> > >
> > > In flink runner we have this code:
> > >
> > > https://github.com/apache/incubator-beam/blob/master/
> > > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> > >
> > > but in mostly cases method startBundle can be expensive for making for
> > > every element (for example connection for db/build cache/ etc)
> > >
> > > Why so important invoke startBundle/finishBundle on every
> > > incoming streamRecord ?
> > >
> > > Thanks
> > > Alexey Diomin
> > >
> > >
> > >
> > >
> >
> >
> >
>

Re: Flink runner. Wrapper for DoFn

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
Hi Alexey,

In general, things like establishing connections and initializing caches
are better done in @Setup and @TearDown methods, rather than @StartBundle
and @FinishBundle, because DoFn's can be reused between bundles and this
way you get more benefit from reuse.

Bundles can be pretty small, especially in streaming pipelines. That said,
they normally shouldn't be 1-element-small. Hopefully someone working on
the Flink runner can comment.

On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari
<am...@yahoo.com.invalid> wrote:

> Hmmm...Thanks...This could very well be my bottleneck since I see tons of
> threads get on WAIT state after sometime& stay like that relatively
> forever.I have a 100 G worth of elements to process...........Is there a
> way to bypass this "startBundle" & get a fairly optimized
> behavior?Anyone? Thanks+regardsAmir-
>
>       From: Demin Alexey <di...@gmail.com>
>  To: dev@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com>
>  Sent: Friday, November 18, 2016 10:40 AM
>  Subject: Re: Flink runner. Wrapper for DoFn
>
> Very simple example:
>
> My DoFn on startBundle load filters from remote db and build optimized
> index, on processElement apply filters on every element for decision about
> push element to next operation or drop his.
>
> In current implementation it's like matching regexp on string, you have 2
> way
> 1) compile regexp every time for every element
> 2) compile regexp one time and apply on all element
>
> now flink work by 1 way and this way not optimal
>
>
> 2016-11-18 22:26 GMT+04:00 amir bahmanyari <am...@yahoo.com.invalid>:
>
> > Hi Alexey," startBundle can be expensive"...Could you elaborate on
> > "expensive" as per each element pls?
> > Thanks
> >
> >      From: Demin Alexey <di...@gmail.com>
> >  To: dev@beam.incubator.apache.org
> >  Sent: Friday, November 18, 2016 7:40 AM
> >  Subject: Flink runner. Wrapper for DoFn
> >
> > Hi
> >
> > In flink runner we have this code:
> >
> > https://github.com/apache/incubator-beam/blob/master/
> > runners/flink/runner/src/main/java/org/apache/beam/runners/
> > flink/translation/wrappers/streaming/DoFnOperator.java#L262
> >
> > but in mostly cases method startBundle can be expensive for making for
> > every element (for example connection for db/build cache/ etc)
> >
> > Why so important invoke startBundle/finishBundle on every
> > incoming streamRecord ?
> >
> > Thanks
> > Alexey Diomin
> >
> >
> >
> >
>
>
>

Re: Flink runner. Wrapper for DoFn

Posted by amir bahmanyari <am...@yahoo.com.INVALID>.
Hmmm...Thanks...This could very well be my bottleneck since I see tons of threads get on WAIT state after sometime& stay like that relatively forever.I have a 100 G worth of elements to process...........Is there a way to bypass this "startBundle" & get a fairly optimized behavior?Anyone? Thanks+regardsAmir-

      From: Demin Alexey <di...@gmail.com>
 To: dev@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Friday, November 18, 2016 10:40 AM
 Subject: Re: Flink runner. Wrapper for DoFn
   
Very simple example:

My DoFn on startBundle load filters from remote db and build optimized
index, on processElement apply filters on every element for decision about
push element to next operation or drop his.

In current implementation it's like matching regexp on string, you have 2
way
1) compile regexp every time for every element
2) compile regexp one time and apply on all element

now flink work by 1 way and this way not optimal


2016-11-18 22:26 GMT+04:00 amir bahmanyari <am...@yahoo.com.invalid>:

> Hi Alexey," startBundle can be expensive"...Could you elaborate on
> "expensive" as per each element pls?
> Thanks
>
>      From: Demin Alexey <di...@gmail.com>
>  To: dev@beam.incubator.apache.org
>  Sent: Friday, November 18, 2016 7:40 AM
>  Subject: Flink runner. Wrapper for DoFn
>
> Hi
>
> In flink runner we have this code:
>
> https://github.com/apache/incubator-beam/blob/master/
> runners/flink/runner/src/main/java/org/apache/beam/runners/
> flink/translation/wrappers/streaming/DoFnOperator.java#L262
>
> but in mostly cases method startBundle can be expensive for making for
> every element (for example connection for db/build cache/ etc)
>
> Why so important invoke startBundle/finishBundle on every
> incoming streamRecord ?
>
> Thanks
> Alexey Diomin
>
>
>
>


   

Re: Flink runner. Wrapper for DoFn

Posted by Demin Alexey <di...@gmail.com>.
Very simple example:

My DoFn on startBundle load filters from remote db and build optimized
index, on processElement apply filters on every element for decision about
push element to next operation or drop his.

In current implementation it's like matching regexp on string, you have 2
way
1) compile regexp every time for every element
2) compile regexp one time and apply on all element

now flink work by 1 way and this way not optimal


2016-11-18 22:26 GMT+04:00 amir bahmanyari <am...@yahoo.com.invalid>:

> Hi Alexey," startBundle can be expensive"...Could you elaborate on
> "expensive" as per each element pls?
> Thanks
>
>       From: Demin Alexey <di...@gmail.com>
>  To: dev@beam.incubator.apache.org
>  Sent: Friday, November 18, 2016 7:40 AM
>  Subject: Flink runner. Wrapper for DoFn
>
> Hi
>
> In flink runner we have this code:
>
> https://github.com/apache/incubator-beam/blob/master/
> runners/flink/runner/src/main/java/org/apache/beam/runners/
> flink/translation/wrappers/streaming/DoFnOperator.java#L262
>
> but in mostly cases method startBundle can be expensive for making for
> every element (for example connection for db/build cache/ etc)
>
> Why so important invoke startBundle/finishBundle on every
> incoming streamRecord ?
>
> Thanks
> Alexey Diomin
>
>
>
>

Re: Flink runner. Wrapper for DoFn

Posted by amir bahmanyari <am...@yahoo.com.INVALID>.
Hi Alexey," startBundle can be expensive"...Could you elaborate on "expensive" as per each element pls?
Thanks

      From: Demin Alexey <di...@gmail.com>
 To: dev@beam.incubator.apache.org 
 Sent: Friday, November 18, 2016 7:40 AM
 Subject: Flink runner. Wrapper for DoFn
   
Hi

In flink runner we have this code:

https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L262

but in mostly cases method startBundle can be expensive for making for
every element (for example connection for db/build cache/ etc)

Why so important invoke startBundle/finishBundle on every
incoming streamRecord ?

Thanks
Alexey Diomin