You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Thomas Weise <th...@apache.org> on 2018/09/28 00:54:51 UTC

Portable Flink runner: Generator source for testing

There were a few discussions how we can facilitate testing for portable
streaming pipelines with the Flink runner. The problem is that we currently
don't have streaming sources in the Python SDK.

One way to support testing could be a generator that extends the idea of
Impulse to provide a Flink native trigger transform, optionally
parameterized with an interval and max count.

Test pipelines could then follow the generator with a Map function that
creates whatever payloads are desirable.

Thoughts?

Thanks,
Thomas

Re: Portable Flink runner: Generator source for testing

Posted by Micah Wylde <mw...@lyft.com>.
I've opened a JIRA for adding the generator source (BEAM-5707) and sent out
a very rough PR (https://github.com/apache/beam/pull/6637). Would
appreciate any feedback.

On Mon, Oct 8, 2018 at 9:43 AM, Thomas Weise <th...@apache.org> wrote:

> The portable runner does not support metrics yet: https://s.apache.org/
> apache-beam-portability-support-table
>
> There is also no JIRA referenced in the table, would be good to
> locate/create it.
>
> On Mon, Oct 8, 2018 at 9:11 AM Łukasz Gajowy <lu...@gmail.com>
> wrote:
>
>> Does anyone know what is the status of metrics support for Flink Portable
>> Runner? I think we need them to be used in such tests to at least collect
>> time metric that does not contain cluster warm up time, staging resources
>> time and other things that can disturb the actual run time metric. We
>> probably should use the metrics API in some other places (as described in
>> the above-mentioned proposal).
>>
>>
>>
>> pon., 8 paź 2018 o 12:12 Maximilian Michels <mx...@apache.org> napisał(a):
>>
>>> This is correct. However, the example code is only part of Lyft's code
>>> base. Until timer support is done, we would have to do something similar
>>> in our code base.
>>>
>>> On 08.10.18 02:34, Łukasz Gajowy wrote:
>>> > Hi,
>>> >
>>> > just to clarify, judging from the above snippets: it seems that we are
>>> > able now to run tests that use a native source for data generation and
>>> > use them in this form until the Timers are supported. When Timers are
>>> > there, we should consider switching to the Impulse + PTransform based
>>> > solution (described above) because it's more portable - the current is
>>> > dedicated to Flink only (which still is really cool). Is this correct
>>> or
>>> > am I missing something?
>>> >
>>> > Łukasz
>>> >
>>> > pt., 5 paź 2018 o 14:04 Maximilian Michels <mxm@apache.org
>>> > <ma...@apache.org>> napisał(a):
>>> >
>>> >     Thanks for sharing your setup. You're right that we need timers to
>>> >     continuously ingest data to the testing pipeline.
>>> >
>>> >     Here is the Flink source which generates the data:
>>> >     https://github.com/mwylde/beam/commit/
>>> 09c62991773c749bc037cc2b6044896e2d34988a#diff-
>>> b2fc8d680d9c1da86ba23345f3bc83d4R42
>>> >
>>> >     On 04.10.18 19:31, Thomas Weise wrote:
>>> >      > FYI here is an example with native generator for portable Flink
>>> >     runner:
>>> >      >
>>> >      > https://github.com/mwylde/beam/tree/micah_memory_leak
>>> >      >
>>> >     https://github.com/mwylde/beam/blob/22f7099b071e65a76110ecc5beda06
>>> 36ca07e101/sdks/python/apache_beam/examples/streaming_leak.py
>>> >      >
>>> >      > You can use it to run the portable Flink runner in streaming
>>> mode
>>> >      > continuously for testing purposes.
>>> >      >
>>> >      >
>>> >      > On Mon, Oct 1, 2018 at 9:50 AM Thomas Weise <thw@apache.org
>>> >     <ma...@apache.org>
>>> >      > <mailto:thw@apache.org <ma...@apache.org>>> wrote:
>>> >      >
>>> >      >
>>> >      >
>>> >      >     On Mon, Oct 1, 2018 at 8:29 AM Maximilian Michels
>>> >     <mxm@apache.org <ma...@apache.org>
>>> >      >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>>> >      >
>>> >      >          > and then have Flink manage the parallelism for stages
>>> >      >         downstream from that?@Pablo Can you clarify what you
>>> mean
>>> >     by that?
>>> >      >
>>> >      >         Let me paraphrase this just to get a clear
>>> understanding.
>>> >     There
>>> >      >         are two
>>> >      >         approaches to test portable streaming pipelines:
>>> >      >
>>> >      >         a) Use an Impulse followed by a test PTransform which
>>> >     generates
>>> >      >         testing
>>> >      >         data. This is similar to how streaming sources work
>>> which
>>> >     don't
>>> >      >         use the
>>> >      >         Read Transform. For basic testing this should work, even
>>> >     without
>>> >      >         support
>>> >      >         for Timers.
>>> >      >
>>> >      >
>>> >      >     AFAIK this works for bounded sources and batch mode of the
>>> Flink
>>> >      >     runner (staged execution).
>>> >      >
>>> >      >     For streaming we need small bundles, we cannot have a Python
>>> >     ParDo
>>> >      >     block to emit records periodically.
>>> >      >
>>> >      >     (With timers, the ParDo wouldn't block but instead schedule
>>> >     itself
>>> >      >     as needed.)
>>> >      >
>>> >      >         b) Introduce a new URN which gets translated to a native
>>> >      >         Flink/Spark/xy
>>> >      >         testing transform.
>>> >      >
>>> >      >         We should go for a) as this will make testing easier
>>> across
>>> >      >         portable
>>> >      >         runners. We previously discussed native transforms will
>>> be an
>>> >      >         option in
>>> >      >         Beam, but it would be preferable to leave them out of
>>> testing
>>> >      >         for now.
>>> >      >
>>> >      >         Thanks,
>>> >      >         Max
>>> >      >
>>> >      >
>>> >      >         On 28.09.18 21:14, Thomas Weise wrote:
>>> >      >          > Thanks for sharing the link, this looks very
>>> promising!
>>> >      >          >
>>> >      >          > For the synthetic source, if we need a runner native
>>> >     trigger
>>> >      >         mechanism,
>>> >      >          > then it should probably just emit an empty byte
>>> array like
>>> >      >         the impulse
>>> >      >          > implementation does, and everything else could be
>>> left
>>> >     to SDK
>>> >      >         specific
>>> >      >          > transforms that are downstream. We don't have
>>> support for
>>> >      >         timers in the
>>> >      >          > portable Flink runner yet. With timers, there would
>>> not be
>>> >      >         the need for
>>> >      >          > a runner native URN and it could work just like Pablo
>>> >     described.
>>> >      >          >
>>> >      >          >
>>> >      >          > On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy
>>> >      >         <lukasz.gajowy@gmail.com <mailto:lukasz.gajowy@gmail.
>>> com>
>>> >     <mailto:lukasz.gajowy@gmail.com <ma...@gmail.com>>
>>> >      >          > <mailto:lukasz.gajowy@gmail.com
>>> >     <ma...@gmail.com>
>>> >      >         <mailto:lukasz.gajowy@gmail.com
>>> >     <ma...@gmail.com>>>> wrote:
>>> >      >          >
>>> >      >          >     Hi all,
>>> >      >          >
>>> >      >          >     thank you, Thomas, for starting this discussion
>>> >     and Pablo for
>>> >      >          >     sharing the ideas. FWIW adding here, we discussed
>>> >     this in
>>> >      >         terms of
>>> >      >          >     Core Beam Transform Load Tests that we are
>>> working on
>>> >      >         right now [1].
>>> >      >          >     If generating synthetic data will be possible for
>>> >      >         portable streaming
>>> >      >          >     pipelines, we could use it in our work to test
>>> Python
>>> >      >         streaming
>>> >      >          >     scenarios.
>>> >      >          >
>>> >      >          >     [1] _https://s.apache.org/GVMa_
>>> >      >          >
>>> >      >          >     pt., 28 wrz 2018 o 08:18 Pablo Estrada
>>> >      >         <pabloem@google.com <ma...@google.com>
>>> >     <mailto:pabloem@google.com <ma...@google.com>>
>>> >      >          >     <mailto:pabloem@google.com
>>> >     <ma...@google.com> <mailto:pabloem@google.com
>>> >     <ma...@google.com>>>>
>>> >      >         napisał(a):
>>> >      >          >
>>> >      >          >         Hi Thomas, all,
>>> >      >          >         yes, this is quite important for testing, and
>>> >     in fact
>>> >      >         I'd think
>>> >      >          >         it's important to streamline the insertion of
>>> >     native
>>> >      >         sources
>>> >      >          >         from different runners to make the current
>>> runners
>>> >      >         more usable.
>>> >      >          >         But that's another topic.
>>> >      >          >
>>> >      >          >         For generators of synthetic data, I had a
>>> couple
>>> >      >         ideas (and this
>>> >      >          >         will show my limited knowledge about Flink
>>> and
>>> >      >         Streaming, but oh
>>> >      >          >         well):
>>> >      >          >
>>> >      >          >         - Flink experts: Is it possible to add a
>>> pure-Beam
>>> >      >         generator
>>> >      >          >         that will do something like: Impulse ->
>>> >      >         ParDo(generate multiple
>>> >      >          >         elements) -> Forced "Write" to Flink (e.g.
>>> >     something
>>> >      >         like a
>>> >      >          >         reshuffle), and then have Flink manage the
>>> >      >         parallelism for
>>> >      >          >         stages downstream from that?
>>> >      >          >
>>> >      >          >         - If this is not possible, it may be worth
>>> >     writing some
>>> >      >          >         transform in Flink / other runners that can
>>> be
>>> >      >         plugged in by
>>> >      >          >         inserting a custom URN. In fact, it may be a
>>> >     good idea to
>>> >      >          >         streamline the insertion of native sources
>>> for
>>> >     each
>>> >      >         runner based
>>> >      >          >         on some sort of CustomURNTransform() ?
>>> >      >          >
>>> >      >          >         I hope I did not butcher those explanations
>>> >     too badly...
>>> >      >          >         Best
>>> >      >          >         -P.
>>> >      >          >
>>> >      >          >         On Thu, Sep 27, 2018, 5:55 PM Thomas Weise
>>> >      >         <thw@apache.org <ma...@apache.org>
>>> >     <mailto:thw@apache.org <ma...@apache.org>>
>>> >      >          >         <mailto:thw@apache.org <mailto:
>>> thw@apache.org>
>>> >     <mailto:thw@apache.org <ma...@apache.org>>>> wrote:
>>> >      >          >
>>> >      >          >             There were a few discussions how we can
>>> >      >         facilitate testing
>>> >      >          >             for portable streaming pipelines with the
>>> >     Flink
>>> >      >         runner. The
>>> >      >          >             problem is that we currently don't have
>>> >     streaming
>>> >      >         sources in
>>> >      >          >             the Python SDK.
>>> >      >          >
>>> >      >          >             One way to support testing could be a
>>> >     generator
>>> >      >         that extends
>>> >      >          >             the idea of Impulse to provide a Flink
>>> >     native trigger
>>> >      >          >             transform, optionally parameterized with
>>> an
>>> >      >         interval and max
>>> >      >          >             count.
>>> >      >          >
>>> >      >          >             Test pipelines could then follow the
>>> generator
>>> >      >         with a Map
>>> >      >          >             function that creates whatever payloads
>>> are
>>> >      >         desirable.
>>> >      >          >
>>> >      >          >             Thoughts?
>>> >      >          >
>>> >      >          >             Thanks,
>>> >      >          >             Thomas
>>> >      >          >
>>> >      >
>>> >
>>>
>>

Re: Portable Flink runner: Generator source for testing

Posted by Thomas Weise <th...@apache.org>.
The portable runner does not support metrics yet:
https://s.apache.org/apache-beam-portability-support-table

There is also no JIRA referenced in the table, would be good to
locate/create it.

On Mon, Oct 8, 2018 at 9:11 AM Łukasz Gajowy <lu...@gmail.com>
wrote:

> Does anyone know what is the status of metrics support for Flink Portable
> Runner? I think we need them to be used in such tests to at least collect
> time metric that does not contain cluster warm up time, staging resources
> time and other things that can disturb the actual run time metric. We
> probably should use the metrics API in some other places (as described in
> the above-mentioned proposal).
>
>
>
> pon., 8 paź 2018 o 12:12 Maximilian Michels <mx...@apache.org> napisał(a):
>
>> This is correct. However, the example code is only part of Lyft's code
>> base. Until timer support is done, we would have to do something similar
>> in our code base.
>>
>> On 08.10.18 02:34, Łukasz Gajowy wrote:
>> > Hi,
>> >
>> > just to clarify, judging from the above snippets: it seems that we are
>> > able now to run tests that use a native source for data generation and
>> > use them in this form until the Timers are supported. When Timers are
>> > there, we should consider switching to the Impulse + PTransform based
>> > solution (described above) because it's more portable - the current is
>> > dedicated to Flink only (which still is really cool). Is this correct
>> or
>> > am I missing something?
>> >
>> > Łukasz
>> >
>> > pt., 5 paź 2018 o 14:04 Maximilian Michels <mxm@apache.org
>> > <ma...@apache.org>> napisał(a):
>> >
>> >     Thanks for sharing your setup. You're right that we need timers to
>> >     continuously ingest data to the testing pipeline.
>> >
>> >     Here is the Flink source which generates the data:
>> >
>> https://github.com/mwylde/beam/commit/09c62991773c749bc037cc2b6044896e2d34988a#diff-b2fc8d680d9c1da86ba23345f3bc83d4R42
>> >
>> >     On 04.10.18 19:31, Thomas Weise wrote:
>> >      > FYI here is an example with native generator for portable Flink
>> >     runner:
>> >      >
>> >      > https://github.com/mwylde/beam/tree/micah_memory_leak
>> >      >
>> >
>> https://github.com/mwylde/beam/blob/22f7099b071e65a76110ecc5beda0636ca07e101/sdks/python/apache_beam/examples/streaming_leak.py
>> >      >
>> >      > You can use it to run the portable Flink runner in streaming mode
>> >      > continuously for testing purposes.
>> >      >
>> >      >
>> >      > On Mon, Oct 1, 2018 at 9:50 AM Thomas Weise <thw@apache.org
>> >     <ma...@apache.org>
>> >      > <mailto:thw@apache.org <ma...@apache.org>>> wrote:
>> >      >
>> >      >
>> >      >
>> >      >     On Mon, Oct 1, 2018 at 8:29 AM Maximilian Michels
>> >     <mxm@apache.org <ma...@apache.org>
>> >      >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>> >      >
>> >      >          > and then have Flink manage the parallelism for stages
>> >      >         downstream from that?@Pablo Can you clarify what you mean
>> >     by that?
>> >      >
>> >      >         Let me paraphrase this just to get a clear understanding.
>> >     There
>> >      >         are two
>> >      >         approaches to test portable streaming pipelines:
>> >      >
>> >      >         a) Use an Impulse followed by a test PTransform which
>> >     generates
>> >      >         testing
>> >      >         data. This is similar to how streaming sources work which
>> >     don't
>> >      >         use the
>> >      >         Read Transform. For basic testing this should work, even
>> >     without
>> >      >         support
>> >      >         for Timers.
>> >      >
>> >      >
>> >      >     AFAIK this works for bounded sources and batch mode of the
>> Flink
>> >      >     runner (staged execution).
>> >      >
>> >      >     For streaming we need small bundles, we cannot have a Python
>> >     ParDo
>> >      >     block to emit records periodically.
>> >      >
>> >      >     (With timers, the ParDo wouldn't block but instead schedule
>> >     itself
>> >      >     as needed.)
>> >      >
>> >      >         b) Introduce a new URN which gets translated to a native
>> >      >         Flink/Spark/xy
>> >      >         testing transform.
>> >      >
>> >      >         We should go for a) as this will make testing easier
>> across
>> >      >         portable
>> >      >         runners. We previously discussed native transforms will
>> be an
>> >      >         option in
>> >      >         Beam, but it would be preferable to leave them out of
>> testing
>> >      >         for now.
>> >      >
>> >      >         Thanks,
>> >      >         Max
>> >      >
>> >      >
>> >      >         On 28.09.18 21:14, Thomas Weise wrote:
>> >      >          > Thanks for sharing the link, this looks very
>> promising!
>> >      >          >
>> >      >          > For the synthetic source, if we need a runner native
>> >     trigger
>> >      >         mechanism,
>> >      >          > then it should probably just emit an empty byte array
>> like
>> >      >         the impulse
>> >      >          > implementation does, and everything else could be left
>> >     to SDK
>> >      >         specific
>> >      >          > transforms that are downstream. We don't have support
>> for
>> >      >         timers in the
>> >      >          > portable Flink runner yet. With timers, there would
>> not be
>> >      >         the need for
>> >      >          > a runner native URN and it could work just like Pablo
>> >     described.
>> >      >          >
>> >      >          >
>> >      >          > On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy
>> >      >         <lukasz.gajowy@gmail.com <mailto:lukasz.gajowy@gmail.com
>> >
>> >     <mailto:lukasz.gajowy@gmail.com <ma...@gmail.com>>
>> >      >          > <mailto:lukasz.gajowy@gmail.com
>> >     <ma...@gmail.com>
>> >      >         <mailto:lukasz.gajowy@gmail.com
>> >     <ma...@gmail.com>>>> wrote:
>> >      >          >
>> >      >          >     Hi all,
>> >      >          >
>> >      >          >     thank you, Thomas, for starting this discussion
>> >     and Pablo for
>> >      >          >     sharing the ideas. FWIW adding here, we discussed
>> >     this in
>> >      >         terms of
>> >      >          >     Core Beam Transform Load Tests that we are
>> working on
>> >      >         right now [1].
>> >      >          >     If generating synthetic data will be possible for
>> >      >         portable streaming
>> >      >          >     pipelines, we could use it in our work to test
>> Python
>> >      >         streaming
>> >      >          >     scenarios.
>> >      >          >
>> >      >          >     [1] _https://s.apache.org/GVMa_
>> >      >          >
>> >      >          >     pt., 28 wrz 2018 o 08:18 Pablo Estrada
>> >      >         <pabloem@google.com <ma...@google.com>
>> >     <mailto:pabloem@google.com <ma...@google.com>>
>> >      >          >     <mailto:pabloem@google.com
>> >     <ma...@google.com> <mailto:pabloem@google.com
>> >     <ma...@google.com>>>>
>> >      >         napisał(a):
>> >      >          >
>> >      >          >         Hi Thomas, all,
>> >      >          >         yes, this is quite important for testing, and
>> >     in fact
>> >      >         I'd think
>> >      >          >         it's important to streamline the insertion of
>> >     native
>> >      >         sources
>> >      >          >         from different runners to make the current
>> runners
>> >      >         more usable.
>> >      >          >         But that's another topic.
>> >      >          >
>> >      >          >         For generators of synthetic data, I had a
>> couple
>> >      >         ideas (and this
>> >      >          >         will show my limited knowledge about Flink and
>> >      >         Streaming, but oh
>> >      >          >         well):
>> >      >          >
>> >      >          >         - Flink experts: Is it possible to add a
>> pure-Beam
>> >      >         generator
>> >      >          >         that will do something like: Impulse ->
>> >      >         ParDo(generate multiple
>> >      >          >         elements) -> Forced "Write" to Flink (e.g.
>> >     something
>> >      >         like a
>> >      >          >         reshuffle), and then have Flink manage the
>> >      >         parallelism for
>> >      >          >         stages downstream from that?
>> >      >          >
>> >      >          >         - If this is not possible, it may be worth
>> >     writing some
>> >      >          >         transform in Flink / other runners that can be
>> >      >         plugged in by
>> >      >          >         inserting a custom URN. In fact, it may be a
>> >     good idea to
>> >      >          >         streamline the insertion of native sources for
>> >     each
>> >      >         runner based
>> >      >          >         on some sort of CustomURNTransform() ?
>> >      >          >
>> >      >          >         I hope I did not butcher those explanations
>> >     too badly...
>> >      >          >         Best
>> >      >          >         -P.
>> >      >          >
>> >      >          >         On Thu, Sep 27, 2018, 5:55 PM Thomas Weise
>> >      >         <thw@apache.org <ma...@apache.org>
>> >     <mailto:thw@apache.org <ma...@apache.org>>
>> >      >          >         <mailto:thw@apache.org <mailto:thw@apache.org
>> >
>> >     <mailto:thw@apache.org <ma...@apache.org>>>> wrote:
>> >      >          >
>> >      >          >             There were a few discussions how we can
>> >      >         facilitate testing
>> >      >          >             for portable streaming pipelines with the
>> >     Flink
>> >      >         runner. The
>> >      >          >             problem is that we currently don't have
>> >     streaming
>> >      >         sources in
>> >      >          >             the Python SDK.
>> >      >          >
>> >      >          >             One way to support testing could be a
>> >     generator
>> >      >         that extends
>> >      >          >             the idea of Impulse to provide a Flink
>> >     native trigger
>> >      >          >             transform, optionally parameterized with
>> an
>> >      >         interval and max
>> >      >          >             count.
>> >      >          >
>> >      >          >             Test pipelines could then follow the
>> generator
>> >      >         with a Map
>> >      >          >             function that creates whatever payloads
>> are
>> >      >         desirable.
>> >      >          >
>> >      >          >             Thoughts?
>> >      >          >
>> >      >          >             Thanks,
>> >      >          >             Thomas
>> >      >          >
>> >      >
>> >
>>
>

Re: Portable Flink runner: Generator source for testing

Posted by Łukasz Gajowy <lu...@gmail.com>.
Does anyone know what is the status of metrics support for Flink Portable
Runner? I think we need them to be used in such tests to at least collect
time metric that does not contain cluster warm up time, staging resources
time and other things that can disturb the actual run time metric. We
probably should use the metrics API in some other places (as described in
the above-mentioned proposal).



pon., 8 paź 2018 o 12:12 Maximilian Michels <mx...@apache.org> napisał(a):

> This is correct. However, the example code is only part of Lyft's code
> base. Until timer support is done, we would have to do something similar
> in our code base.
>
> On 08.10.18 02:34, Łukasz Gajowy wrote:
> > Hi,
> >
> > just to clarify, judging from the above snippets: it seems that we are
> > able now to run tests that use a native source for data generation and
> > use them in this form until the Timers are supported. When Timers are
> > there, we should consider switching to the Impulse + PTransform based
> > solution (described above) because it's more portable - the current is
> > dedicated to Flink only (which still is really cool). Is this correct or
> > am I missing something?
> >
> > Łukasz
> >
> > pt., 5 paź 2018 o 14:04 Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> napisał(a):
> >
> >     Thanks for sharing your setup. You're right that we need timers to
> >     continuously ingest data to the testing pipeline.
> >
> >     Here is the Flink source which generates the data:
> >
> https://github.com/mwylde/beam/commit/09c62991773c749bc037cc2b6044896e2d34988a#diff-b2fc8d680d9c1da86ba23345f3bc83d4R42
> >
> >     On 04.10.18 19:31, Thomas Weise wrote:
> >      > FYI here is an example with native generator for portable Flink
> >     runner:
> >      >
> >      > https://github.com/mwylde/beam/tree/micah_memory_leak
> >      >
> >
> https://github.com/mwylde/beam/blob/22f7099b071e65a76110ecc5beda0636ca07e101/sdks/python/apache_beam/examples/streaming_leak.py
> >      >
> >      > You can use it to run the portable Flink runner in streaming mode
> >      > continuously for testing purposes.
> >      >
> >      >
> >      > On Mon, Oct 1, 2018 at 9:50 AM Thomas Weise <thw@apache.org
> >     <ma...@apache.org>
> >      > <mailto:thw@apache.org <ma...@apache.org>>> wrote:
> >      >
> >      >
> >      >
> >      >     On Mon, Oct 1, 2018 at 8:29 AM Maximilian Michels
> >     <mxm@apache.org <ma...@apache.org>
> >      >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >      >
> >      >          > and then have Flink manage the parallelism for stages
> >      >         downstream from that?@Pablo Can you clarify what you mean
> >     by that?
> >      >
> >      >         Let me paraphrase this just to get a clear understanding.
> >     There
> >      >         are two
> >      >         approaches to test portable streaming pipelines:
> >      >
> >      >         a) Use an Impulse followed by a test PTransform which
> >     generates
> >      >         testing
> >      >         data. This is similar to how streaming sources work which
> >     don't
> >      >         use the
> >      >         Read Transform. For basic testing this should work, even
> >     without
> >      >         support
> >      >         for Timers.
> >      >
> >      >
> >      >     AFAIK this works for bounded sources and batch mode of the
> Flink
> >      >     runner (staged execution).
> >      >
> >      >     For streaming we need small bundles, we cannot have a Python
> >     ParDo
> >      >     block to emit records periodically.
> >      >
> >      >     (With timers, the ParDo wouldn't block but instead schedule
> >     itself
> >      >     as needed.)
> >      >
> >      >         b) Introduce a new URN which gets translated to a native
> >      >         Flink/Spark/xy
> >      >         testing transform.
> >      >
> >      >         We should go for a) as this will make testing easier
> across
> >      >         portable
> >      >         runners. We previously discussed native transforms will
> be an
> >      >         option in
> >      >         Beam, but it would be preferable to leave them out of
> testing
> >      >         for now.
> >      >
> >      >         Thanks,
> >      >         Max
> >      >
> >      >
> >      >         On 28.09.18 21:14, Thomas Weise wrote:
> >      >          > Thanks for sharing the link, this looks very promising!
> >      >          >
> >      >          > For the synthetic source, if we need a runner native
> >     trigger
> >      >         mechanism,
> >      >          > then it should probably just emit an empty byte array
> like
> >      >         the impulse
> >      >          > implementation does, and everything else could be left
> >     to SDK
> >      >         specific
> >      >          > transforms that are downstream. We don't have support
> for
> >      >         timers in the
> >      >          > portable Flink runner yet. With timers, there would
> not be
> >      >         the need for
> >      >          > a runner native URN and it could work just like Pablo
> >     described.
> >      >          >
> >      >          >
> >      >          > On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy
> >      >         <lukasz.gajowy@gmail.com <ma...@gmail.com>
> >     <mailto:lukasz.gajowy@gmail.com <ma...@gmail.com>>
> >      >          > <mailto:lukasz.gajowy@gmail.com
> >     <ma...@gmail.com>
> >      >         <mailto:lukasz.gajowy@gmail.com
> >     <ma...@gmail.com>>>> wrote:
> >      >          >
> >      >          >     Hi all,
> >      >          >
> >      >          >     thank you, Thomas, for starting this discussion
> >     and Pablo for
> >      >          >     sharing the ideas. FWIW adding here, we discussed
> >     this in
> >      >         terms of
> >      >          >     Core Beam Transform Load Tests that we are working
> on
> >      >         right now [1].
> >      >          >     If generating synthetic data will be possible for
> >      >         portable streaming
> >      >          >     pipelines, we could use it in our work to test
> Python
> >      >         streaming
> >      >          >     scenarios.
> >      >          >
> >      >          >     [1] _https://s.apache.org/GVMa_
> >      >          >
> >      >          >     pt., 28 wrz 2018 o 08:18 Pablo Estrada
> >      >         <pabloem@google.com <ma...@google.com>
> >     <mailto:pabloem@google.com <ma...@google.com>>
> >      >          >     <mailto:pabloem@google.com
> >     <ma...@google.com> <mailto:pabloem@google.com
> >     <ma...@google.com>>>>
> >      >         napisał(a):
> >      >          >
> >      >          >         Hi Thomas, all,
> >      >          >         yes, this is quite important for testing, and
> >     in fact
> >      >         I'd think
> >      >          >         it's important to streamline the insertion of
> >     native
> >      >         sources
> >      >          >         from different runners to make the current
> runners
> >      >         more usable.
> >      >          >         But that's another topic.
> >      >          >
> >      >          >         For generators of synthetic data, I had a
> couple
> >      >         ideas (and this
> >      >          >         will show my limited knowledge about Flink and
> >      >         Streaming, but oh
> >      >          >         well):
> >      >          >
> >      >          >         - Flink experts: Is it possible to add a
> pure-Beam
> >      >         generator
> >      >          >         that will do something like: Impulse ->
> >      >         ParDo(generate multiple
> >      >          >         elements) -> Forced "Write" to Flink (e.g.
> >     something
> >      >         like a
> >      >          >         reshuffle), and then have Flink manage the
> >      >         parallelism for
> >      >          >         stages downstream from that?
> >      >          >
> >      >          >         - If this is not possible, it may be worth
> >     writing some
> >      >          >         transform in Flink / other runners that can be
> >      >         plugged in by
> >      >          >         inserting a custom URN. In fact, it may be a
> >     good idea to
> >      >          >         streamline the insertion of native sources for
> >     each
> >      >         runner based
> >      >          >         on some sort of CustomURNTransform() ?
> >      >          >
> >      >          >         I hope I did not butcher those explanations
> >     too badly...
> >      >          >         Best
> >      >          >         -P.
> >      >          >
> >      >          >         On Thu, Sep 27, 2018, 5:55 PM Thomas Weise
> >      >         <thw@apache.org <ma...@apache.org>
> >     <mailto:thw@apache.org <ma...@apache.org>>
> >      >          >         <mailto:thw@apache.org <ma...@apache.org>
> >     <mailto:thw@apache.org <ma...@apache.org>>>> wrote:
> >      >          >
> >      >          >             There were a few discussions how we can
> >      >         facilitate testing
> >      >          >             for portable streaming pipelines with the
> >     Flink
> >      >         runner. The
> >      >          >             problem is that we currently don't have
> >     streaming
> >      >         sources in
> >      >          >             the Python SDK.
> >      >          >
> >      >          >             One way to support testing could be a
> >     generator
> >      >         that extends
> >      >          >             the idea of Impulse to provide a Flink
> >     native trigger
> >      >          >             transform, optionally parameterized with an
> >      >         interval and max
> >      >          >             count.
> >      >          >
> >      >          >             Test pipelines could then follow the
> generator
> >      >         with a Map
> >      >          >             function that creates whatever payloads are
> >      >         desirable.
> >      >          >
> >      >          >             Thoughts?
> >      >          >
> >      >          >             Thanks,
> >      >          >             Thomas
> >      >          >
> >      >
> >
>

Re: Portable Flink runner: Generator source for testing

Posted by Maximilian Michels <mx...@apache.org>.
This is correct. However, the example code is only part of Lyft's code 
base. Until timer support is done, we would have to do something similar 
in our code base.

On 08.10.18 02:34, Łukasz Gajowy wrote:
> Hi,
> 
> just to clarify, judging from the above snippets: it seems that we are 
> able now to run tests that use a native source for data generation and 
> use them in this form until the Timers are supported. When Timers are 
> there, we should consider switching to the Impulse + PTransform based 
> solution (described above) because it's more portable - the current is 
> dedicated to Flink only (which still is really cool). Is this correct or 
> am I missing something?
> 
> Łukasz
> 
> pt., 5 paź 2018 o 14:04 Maximilian Michels <mxm@apache.org 
> <ma...@apache.org>> napisał(a):
> 
>     Thanks for sharing your setup. You're right that we need timers to
>     continuously ingest data to the testing pipeline.
> 
>     Here is the Flink source which generates the data:
>     https://github.com/mwylde/beam/commit/09c62991773c749bc037cc2b6044896e2d34988a#diff-b2fc8d680d9c1da86ba23345f3bc83d4R42
> 
>     On 04.10.18 19:31, Thomas Weise wrote:
>      > FYI here is an example with native generator for portable Flink
>     runner:
>      >
>      > https://github.com/mwylde/beam/tree/micah_memory_leak
>      >
>     https://github.com/mwylde/beam/blob/22f7099b071e65a76110ecc5beda0636ca07e101/sdks/python/apache_beam/examples/streaming_leak.py
>      >
>      > You can use it to run the portable Flink runner in streaming mode
>      > continuously for testing purposes.
>      >
>      >
>      > On Mon, Oct 1, 2018 at 9:50 AM Thomas Weise <thw@apache.org
>     <ma...@apache.org>
>      > <mailto:thw@apache.org <ma...@apache.org>>> wrote:
>      >
>      >
>      >
>      >     On Mon, Oct 1, 2018 at 8:29 AM Maximilian Michels
>     <mxm@apache.org <ma...@apache.org>
>      >     <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>      >
>      >          > and then have Flink manage the parallelism for stages
>      >         downstream from that?@Pablo Can you clarify what you mean
>     by that?
>      >
>      >         Let me paraphrase this just to get a clear understanding.
>     There
>      >         are two
>      >         approaches to test portable streaming pipelines:
>      >
>      >         a) Use an Impulse followed by a test PTransform which
>     generates
>      >         testing
>      >         data. This is similar to how streaming sources work which
>     don't
>      >         use the
>      >         Read Transform. For basic testing this should work, even
>     without
>      >         support
>      >         for Timers.
>      >
>      >
>      >     AFAIK this works for bounded sources and batch mode of the Flink
>      >     runner (staged execution).
>      >
>      >     For streaming we need small bundles, we cannot have a Python
>     ParDo
>      >     block to emit records periodically.
>      >
>      >     (With timers, the ParDo wouldn't block but instead schedule
>     itself
>      >     as needed.)
>      >
>      >         b) Introduce a new URN which gets translated to a native
>      >         Flink/Spark/xy
>      >         testing transform.
>      >
>      >         We should go for a) as this will make testing easier across
>      >         portable
>      >         runners. We previously discussed native transforms will be an
>      >         option in
>      >         Beam, but it would be preferable to leave them out of testing
>      >         for now.
>      >
>      >         Thanks,
>      >         Max
>      >
>      >
>      >         On 28.09.18 21:14, Thomas Weise wrote:
>      >          > Thanks for sharing the link, this looks very promising!
>      >          >
>      >          > For the synthetic source, if we need a runner native
>     trigger
>      >         mechanism,
>      >          > then it should probably just emit an empty byte array like
>      >         the impulse
>      >          > implementation does, and everything else could be left
>     to SDK
>      >         specific
>      >          > transforms that are downstream. We don't have support for
>      >         timers in the
>      >          > portable Flink runner yet. With timers, there would not be
>      >         the need for
>      >          > a runner native URN and it could work just like Pablo
>     described.
>      >          >
>      >          >
>      >          > On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy
>      >         <lukasz.gajowy@gmail.com <ma...@gmail.com>
>     <mailto:lukasz.gajowy@gmail.com <ma...@gmail.com>>
>      >          > <mailto:lukasz.gajowy@gmail.com
>     <ma...@gmail.com>
>      >         <mailto:lukasz.gajowy@gmail.com
>     <ma...@gmail.com>>>> wrote:
>      >          >
>      >          >     Hi all,
>      >          >
>      >          >     thank you, Thomas, for starting this discussion
>     and Pablo for
>      >          >     sharing the ideas. FWIW adding here, we discussed
>     this in
>      >         terms of
>      >          >     Core Beam Transform Load Tests that we are working on
>      >         right now [1].
>      >          >     If generating synthetic data will be possible for
>      >         portable streaming
>      >          >     pipelines, we could use it in our work to test Python
>      >         streaming
>      >          >     scenarios.
>      >          >
>      >          >     [1] _https://s.apache.org/GVMa_
>      >          >
>      >          >     pt., 28 wrz 2018 o 08:18 Pablo Estrada
>      >         <pabloem@google.com <ma...@google.com>
>     <mailto:pabloem@google.com <ma...@google.com>>
>      >          >     <mailto:pabloem@google.com
>     <ma...@google.com> <mailto:pabloem@google.com
>     <ma...@google.com>>>>
>      >         napisał(a):
>      >          >
>      >          >         Hi Thomas, all,
>      >          >         yes, this is quite important for testing, and
>     in fact
>      >         I'd think
>      >          >         it's important to streamline the insertion of
>     native
>      >         sources
>      >          >         from different runners to make the current runners
>      >         more usable.
>      >          >         But that's another topic.
>      >          >
>      >          >         For generators of synthetic data, I had a couple
>      >         ideas (and this
>      >          >         will show my limited knowledge about Flink and
>      >         Streaming, but oh
>      >          >         well):
>      >          >
>      >          >         - Flink experts: Is it possible to add a pure-Beam
>      >         generator
>      >          >         that will do something like: Impulse ->
>      >         ParDo(generate multiple
>      >          >         elements) -> Forced "Write" to Flink (e.g.
>     something
>      >         like a
>      >          >         reshuffle), and then have Flink manage the
>      >         parallelism for
>      >          >         stages downstream from that?
>      >          >
>      >          >         - If this is not possible, it may be worth
>     writing some
>      >          >         transform in Flink / other runners that can be
>      >         plugged in by
>      >          >         inserting a custom URN. In fact, it may be a
>     good idea to
>      >          >         streamline the insertion of native sources for
>     each
>      >         runner based
>      >          >         on some sort of CustomURNTransform() ?
>      >          >
>      >          >         I hope I did not butcher those explanations
>     too badly...
>      >          >         Best
>      >          >         -P.
>      >          >
>      >          >         On Thu, Sep 27, 2018, 5:55 PM Thomas Weise
>      >         <thw@apache.org <ma...@apache.org>
>     <mailto:thw@apache.org <ma...@apache.org>>
>      >          >         <mailto:thw@apache.org <ma...@apache.org>
>     <mailto:thw@apache.org <ma...@apache.org>>>> wrote:
>      >          >
>      >          >             There were a few discussions how we can
>      >         facilitate testing
>      >          >             for portable streaming pipelines with the
>     Flink
>      >         runner. The
>      >          >             problem is that we currently don't have
>     streaming
>      >         sources in
>      >          >             the Python SDK.
>      >          >
>      >          >             One way to support testing could be a
>     generator
>      >         that extends
>      >          >             the idea of Impulse to provide a Flink
>     native trigger
>      >          >             transform, optionally parameterized with an
>      >         interval and max
>      >          >             count.
>      >          >
>      >          >             Test pipelines could then follow the generator
>      >         with a Map
>      >          >             function that creates whatever payloads are
>      >         desirable.
>      >          >
>      >          >             Thoughts?
>      >          >
>      >          >             Thanks,
>      >          >             Thomas
>      >          >
>      >
> 

Re: Portable Flink runner: Generator source for testing

Posted by Łukasz Gajowy <lg...@apache.org>.
Hi,

just to clarify, judging from the above snippets: it seems that we are able
now to run tests that use a native source for data generation and use them
in this form until the Timers are supported. When Timers are there, we
should consider switching to the Impulse + PTransform based solution
(described above) because it's more portable - the current is dedicated to
Flink only (which still is really cool). Is this correct or am I missing
something?

Łukasz

pt., 5 paź 2018 o 14:04 Maximilian Michels <mx...@apache.org> napisał(a):

> Thanks for sharing your setup. You're right that we need timers to
> continuously ingest data to the testing pipeline.
>
> Here is the Flink source which generates the data:
>
> https://github.com/mwylde/beam/commit/09c62991773c749bc037cc2b6044896e2d34988a#diff-b2fc8d680d9c1da86ba23345f3bc83d4R42
>
> On 04.10.18 19:31, Thomas Weise wrote:
> > FYI here is an example with native generator for portable Flink runner:
> >
> > https://github.com/mwylde/beam/tree/micah_memory_leak
> >
> https://github.com/mwylde/beam/blob/22f7099b071e65a76110ecc5beda0636ca07e101/sdks/python/apache_beam/examples/streaming_leak.py
> >
> > You can use it to run the portable Flink runner in streaming mode
> > continuously for testing purposes.
> >
> >
> > On Mon, Oct 1, 2018 at 9:50 AM Thomas Weise <thw@apache.org
> > <ma...@apache.org>> wrote:
> >
> >
> >
> >     On Mon, Oct 1, 2018 at 8:29 AM Maximilian Michels <mxm@apache.org
> >     <ma...@apache.org>> wrote:
> >
> >          > and then have Flink manage the parallelism for stages
> >         downstream from that?@Pablo Can you clarify what you mean by
> that?
> >
> >         Let me paraphrase this just to get a clear understanding. There
> >         are two
> >         approaches to test portable streaming pipelines:
> >
> >         a) Use an Impulse followed by a test PTransform which generates
> >         testing
> >         data. This is similar to how streaming sources work which don't
> >         use the
> >         Read Transform. For basic testing this should work, even without
> >         support
> >         for Timers.
> >
> >
> >     AFAIK this works for bounded sources and batch mode of the Flink
> >     runner (staged execution).
> >
> >     For streaming we need small bundles, we cannot have a Python ParDo
> >     block to emit records periodically.
> >
> >     (With timers, the ParDo wouldn't block but instead schedule itself
> >     as needed.)
> >
> >         b) Introduce a new URN which gets translated to a native
> >         Flink/Spark/xy
> >         testing transform.
> >
> >         We should go for a) as this will make testing easier across
> >         portable
> >         runners. We previously discussed native transforms will be an
> >         option in
> >         Beam, but it would be preferable to leave them out of testing
> >         for now.
> >
> >         Thanks,
> >         Max
> >
> >
> >         On 28.09.18 21:14, Thomas Weise wrote:
> >          > Thanks for sharing the link, this looks very promising!
> >          >
> >          > For the synthetic source, if we need a runner native trigger
> >         mechanism,
> >          > then it should probably just emit an empty byte array like
> >         the impulse
> >          > implementation does, and everything else could be left to SDK
> >         specific
> >          > transforms that are downstream. We don't have support for
> >         timers in the
> >          > portable Flink runner yet. With timers, there would not be
> >         the need for
> >          > a runner native URN and it could work just like Pablo
> described.
> >          >
> >          >
> >          > On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy
> >         <lukasz.gajowy@gmail.com <ma...@gmail.com>
> >          > <mailto:lukasz.gajowy@gmail.com
> >         <ma...@gmail.com>>> wrote:
> >          >
> >          >     Hi all,
> >          >
> >          >     thank you, Thomas, for starting this discussion and Pablo
> for
> >          >     sharing the ideas. FWIW adding here, we discussed this in
> >         terms of
> >          >     Core Beam Transform Load Tests that we are working on
> >         right now [1].
> >          >     If generating synthetic data will be possible for
> >         portable streaming
> >          >     pipelines, we could use it in our work to test Python
> >         streaming
> >          >     scenarios.
> >          >
> >          >     [1] _https://s.apache.org/GVMa_
> >          >
> >          >     pt., 28 wrz 2018 o 08:18 Pablo Estrada
> >         <pabloem@google.com <ma...@google.com>
> >          >     <mailto:pabloem@google.com <ma...@google.com>>>
> >         napisał(a):
> >          >
> >          >         Hi Thomas, all,
> >          >         yes, this is quite important for testing, and in fact
> >         I'd think
> >          >         it's important to streamline the insertion of native
> >         sources
> >          >         from different runners to make the current runners
> >         more usable.
> >          >         But that's another topic.
> >          >
> >          >         For generators of synthetic data, I had a couple
> >         ideas (and this
> >          >         will show my limited knowledge about Flink and
> >         Streaming, but oh
> >          >         well):
> >          >
> >          >         - Flink experts: Is it possible to add a pure-Beam
> >         generator
> >          >         that will do something like: Impulse ->
> >         ParDo(generate multiple
> >          >         elements) -> Forced "Write" to Flink (e.g. something
> >         like a
> >          >         reshuffle), and then have Flink manage the
> >         parallelism for
> >          >         stages downstream from that?
> >          >
> >          >         - If this is not possible, it may be worth writing
> some
> >          >         transform in Flink / other runners that can be
> >         plugged in by
> >          >         inserting a custom URN. In fact, it may be a good
> idea to
> >          >         streamline the insertion of native sources for each
> >         runner based
> >          >         on some sort of CustomURNTransform() ?
> >          >
> >          >         I hope I did not butcher those explanations too
> badly...
> >          >         Best
> >          >         -P.
> >          >
> >          >         On Thu, Sep 27, 2018, 5:55 PM Thomas Weise
> >         <thw@apache.org <ma...@apache.org>
> >          >         <mailto:thw@apache.org <ma...@apache.org>>>
> wrote:
> >          >
> >          >             There were a few discussions how we can
> >         facilitate testing
> >          >             for portable streaming pipelines with the Flink
> >         runner. The
> >          >             problem is that we currently don't have streaming
> >         sources in
> >          >             the Python SDK.
> >          >
> >          >             One way to support testing could be a generator
> >         that extends
> >          >             the idea of Impulse to provide a Flink native
> trigger
> >          >             transform, optionally parameterized with an
> >         interval and max
> >          >             count.
> >          >
> >          >             Test pipelines could then follow the generator
> >         with a Map
> >          >             function that creates whatever payloads are
> >         desirable.
> >          >
> >          >             Thoughts?
> >          >
> >          >             Thanks,
> >          >             Thomas
> >          >
> >
>

Re: Portable Flink runner: Generator source for testing

Posted by Maximilian Michels <mx...@apache.org>.
Thanks for sharing your setup. You're right that we need timers to 
continuously ingest data to the testing pipeline.

Here is the Flink source which generates the data:
https://github.com/mwylde/beam/commit/09c62991773c749bc037cc2b6044896e2d34988a#diff-b2fc8d680d9c1da86ba23345f3bc83d4R42

On 04.10.18 19:31, Thomas Weise wrote:
> FYI here is an example with native generator for portable Flink runner:
> 
> https://github.com/mwylde/beam/tree/micah_memory_leak
> https://github.com/mwylde/beam/blob/22f7099b071e65a76110ecc5beda0636ca07e101/sdks/python/apache_beam/examples/streaming_leak.py
> 
> You can use it to run the portable Flink runner in streaming mode 
> continuously for testing purposes.
> 
> 
> On Mon, Oct 1, 2018 at 9:50 AM Thomas Weise <thw@apache.org 
> <ma...@apache.org>> wrote:
> 
> 
> 
>     On Mon, Oct 1, 2018 at 8:29 AM Maximilian Michels <mxm@apache.org
>     <ma...@apache.org>> wrote:
> 
>          > and then have Flink manage the parallelism for stages
>         downstream from that?@Pablo Can you clarify what you mean by that?
> 
>         Let me paraphrase this just to get a clear understanding. There
>         are two
>         approaches to test portable streaming pipelines:
> 
>         a) Use an Impulse followed by a test PTransform which generates
>         testing
>         data. This is similar to how streaming sources work which don't
>         use the
>         Read Transform. For basic testing this should work, even without
>         support
>         for Timers.
> 
> 
>     AFAIK this works for bounded sources and batch mode of the Flink
>     runner (staged execution).
> 
>     For streaming we need small bundles, we cannot have a Python ParDo
>     block to emit records periodically.
> 
>     (With timers, the ParDo wouldn't block but instead schedule itself
>     as needed.)
> 
>         b) Introduce a new URN which gets translated to a native
>         Flink/Spark/xy
>         testing transform.
> 
>         We should go for a) as this will make testing easier across
>         portable
>         runners. We previously discussed native transforms will be an
>         option in
>         Beam, but it would be preferable to leave them out of testing
>         for now.
> 
>         Thanks,
>         Max
> 
> 
>         On 28.09.18 21:14, Thomas Weise wrote:
>          > Thanks for sharing the link, this looks very promising!
>          >
>          > For the synthetic source, if we need a runner native trigger
>         mechanism,
>          > then it should probably just emit an empty byte array like
>         the impulse
>          > implementation does, and everything else could be left to SDK
>         specific
>          > transforms that are downstream. We don't have support for
>         timers in the
>          > portable Flink runner yet. With timers, there would not be
>         the need for
>          > a runner native URN and it could work just like Pablo described.
>          >
>          >
>          > On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy
>         <lukasz.gajowy@gmail.com <ma...@gmail.com>
>          > <mailto:lukasz.gajowy@gmail.com
>         <ma...@gmail.com>>> wrote:
>          >
>          >     Hi all,
>          >
>          >     thank you, Thomas, for starting this discussion and Pablo for
>          >     sharing the ideas. FWIW adding here, we discussed this in
>         terms of
>          >     Core Beam Transform Load Tests that we are working on
>         right now [1].
>          >     If generating synthetic data will be possible for
>         portable streaming
>          >     pipelines, we could use it in our work to test Python
>         streaming
>          >     scenarios.
>          >
>          >     [1] _https://s.apache.org/GVMa_
>          >
>          >     pt., 28 wrz 2018 o 08:18 Pablo Estrada
>         <pabloem@google.com <ma...@google.com>
>          >     <mailto:pabloem@google.com <ma...@google.com>>>
>         napisał(a):
>          >
>          >         Hi Thomas, all,
>          >         yes, this is quite important for testing, and in fact
>         I'd think
>          >         it's important to streamline the insertion of native
>         sources
>          >         from different runners to make the current runners
>         more usable.
>          >         But that's another topic.
>          >
>          >         For generators of synthetic data, I had a couple
>         ideas (and this
>          >         will show my limited knowledge about Flink and
>         Streaming, but oh
>          >         well):
>          >
>          >         - Flink experts: Is it possible to add a pure-Beam
>         generator
>          >         that will do something like: Impulse ->
>         ParDo(generate multiple
>          >         elements) -> Forced "Write" to Flink (e.g. something
>         like a
>          >         reshuffle), and then have Flink manage the
>         parallelism for
>          >         stages downstream from that?
>          >
>          >         - If this is not possible, it may be worth writing some
>          >         transform in Flink / other runners that can be
>         plugged in by
>          >         inserting a custom URN. In fact, it may be a good idea to
>          >         streamline the insertion of native sources for each
>         runner based
>          >         on some sort of CustomURNTransform() ?
>          >
>          >         I hope I did not butcher those explanations too badly...
>          >         Best
>          >         -P.
>          >
>          >         On Thu, Sep 27, 2018, 5:55 PM Thomas Weise
>         <thw@apache.org <ma...@apache.org>
>          >         <mailto:thw@apache.org <ma...@apache.org>>> wrote:
>          >
>          >             There were a few discussions how we can
>         facilitate testing
>          >             for portable streaming pipelines with the Flink
>         runner. The
>          >             problem is that we currently don't have streaming
>         sources in
>          >             the Python SDK.
>          >
>          >             One way to support testing could be a generator
>         that extends
>          >             the idea of Impulse to provide a Flink native trigger
>          >             transform, optionally parameterized with an
>         interval and max
>          >             count.
>          >
>          >             Test pipelines could then follow the generator
>         with a Map
>          >             function that creates whatever payloads are
>         desirable.
>          >
>          >             Thoughts?
>          >
>          >             Thanks,
>          >             Thomas
>          >
> 

Re: Portable Flink runner: Generator source for testing

Posted by Thomas Weise <th...@apache.org>.
FYI here is an example with native generator for portable Flink runner:

https://github.com/mwylde/beam/tree/micah_memory_leak
https://github.com/mwylde/beam/blob/22f7099b071e65a76110ecc5beda0636ca07e101/sdks/python/apache_beam/examples/streaming_leak.py

You can use it to run the portable Flink runner in streaming mode
continuously for testing purposes.


On Mon, Oct 1, 2018 at 9:50 AM Thomas Weise <th...@apache.org> wrote:

>
>
> On Mon, Oct 1, 2018 at 8:29 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> > and then have Flink manage the parallelism for stages downstream from
>> that?@Pablo Can you clarify what you mean by that?
>>
>> Let me paraphrase this just to get a clear understanding. There are two
>> approaches to test portable streaming pipelines:
>>
>> a) Use an Impulse followed by a test PTransform which generates testing
>> data. This is similar to how streaming sources work which don't use the
>> Read Transform. For basic testing this should work, even without support
>> for Timers.
>>
>
> AFAIK this works for bounded sources and batch mode of the Flink runner
> (staged execution).
>
> For streaming we need small bundles, we cannot have a Python ParDo block
> to emit records periodically.
>
> (With timers, the ParDo wouldn't block but instead schedule itself as
> needed.)
>
> b) Introduce a new URN which gets translated to a native Flink/Spark/xy
>> testing transform.
>>
>> We should go for a) as this will make testing easier across portable
>> runners. We previously discussed native transforms will be an option in
>> Beam, but it would be preferable to leave them out of testing for now.
>>
>> Thanks,
>> Max
>>
>>
>> On 28.09.18 21:14, Thomas Weise wrote:
>> > Thanks for sharing the link, this looks very promising!
>> >
>> > For the synthetic source, if we need a runner native trigger mechanism,
>> > then it should probably just emit an empty byte array like the impulse
>> > implementation does, and everything else could be left to SDK specific
>> > transforms that are downstream. We don't have support for timers in the
>> > portable Flink runner yet. With timers, there would not be the need for
>> > a runner native URN and it could work just like Pablo described.
>> >
>> >
>> > On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy <lukasz.gajowy@gmail.com
>> > <ma...@gmail.com>> wrote:
>> >
>> >     Hi all,
>> >
>> >     thank you, Thomas, for starting this discussion and Pablo for
>> >     sharing the ideas. FWIW adding here, we discussed this in terms of
>> >     Core Beam Transform Load Tests that we are working on right now [1].
>> >     If generating synthetic data will be possible for portable streaming
>> >     pipelines, we could use it in our work to test Python streaming
>> >     scenarios.
>> >
>> >     [1] _https://s.apache.org/GVMa_
>> >
>> >     pt., 28 wrz 2018 o 08:18 Pablo Estrada <pabloem@google.com
>> >     <ma...@google.com>> napisał(a):
>> >
>> >         Hi Thomas, all,
>> >         yes, this is quite important for testing, and in fact I'd think
>> >         it's important to streamline the insertion of native sources
>> >         from different runners to make the current runners more usable.
>> >         But that's another topic.
>> >
>> >         For generators of synthetic data, I had a couple ideas (and this
>> >         will show my limited knowledge about Flink and Streaming, but oh
>> >         well):
>> >
>> >         - Flink experts: Is it possible to add a pure-Beam generator
>> >         that will do something like: Impulse -> ParDo(generate multiple
>> >         elements) -> Forced "Write" to Flink (e.g. something like a
>> >         reshuffle), and then have Flink manage the parallelism for
>> >         stages downstream from that?
>> >
>> >         - If this is not possible, it may be worth writing some
>> >         transform in Flink / other runners that can be plugged in by
>> >         inserting a custom URN. In fact, it may be a good idea to
>> >         streamline the insertion of native sources for each runner based
>> >         on some sort of CustomURNTransform() ?
>> >
>> >         I hope I did not butcher those explanations too badly...
>> >         Best
>> >         -P.
>> >
>> >         On Thu, Sep 27, 2018, 5:55 PM Thomas Weise <thw@apache.org
>> >         <ma...@apache.org>> wrote:
>> >
>> >             There were a few discussions how we can facilitate testing
>> >             for portable streaming pipelines with the Flink runner. The
>> >             problem is that we currently don't have streaming sources in
>> >             the Python SDK.
>> >
>> >             One way to support testing could be a generator that extends
>> >             the idea of Impulse to provide a Flink native trigger
>> >             transform, optionally parameterized with an interval and max
>> >             count.
>> >
>> >             Test pipelines could then follow the generator with a Map
>> >             function that creates whatever payloads are desirable.
>> >
>> >             Thoughts?
>> >
>> >             Thanks,
>> >             Thomas
>> >
>>
>

Re: Portable Flink runner: Generator source for testing

Posted by Thomas Weise <th...@apache.org>.
On Mon, Oct 1, 2018 at 8:29 AM Maximilian Michels <mx...@apache.org> wrote:

> > and then have Flink manage the parallelism for stages downstream from
> that?@Pablo Can you clarify what you mean by that?
>
> Let me paraphrase this just to get a clear understanding. There are two
> approaches to test portable streaming pipelines:
>
> a) Use an Impulse followed by a test PTransform which generates testing
> data. This is similar to how streaming sources work which don't use the
> Read Transform. For basic testing this should work, even without support
> for Timers.
>

AFAIK this works for bounded sources and batch mode of the Flink runner
(staged execution).

For streaming we need small bundles, we cannot have a Python ParDo block to
emit records periodically.

(With timers, the ParDo wouldn't block but instead schedule itself as
needed.)

b) Introduce a new URN which gets translated to a native Flink/Spark/xy
> testing transform.
>
> We should go for a) as this will make testing easier across portable
> runners. We previously discussed native transforms will be an option in
> Beam, but it would be preferable to leave them out of testing for now.
>
> Thanks,
> Max
>
>
> On 28.09.18 21:14, Thomas Weise wrote:
> > Thanks for sharing the link, this looks very promising!
> >
> > For the synthetic source, if we need a runner native trigger mechanism,
> > then it should probably just emit an empty byte array like the impulse
> > implementation does, and everything else could be left to SDK specific
> > transforms that are downstream. We don't have support for timers in the
> > portable Flink runner yet. With timers, there would not be the need for
> > a runner native URN and it could work just like Pablo described.
> >
> >
> > On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy <lukasz.gajowy@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Hi all,
> >
> >     thank you, Thomas, for starting this discussion and Pablo for
> >     sharing the ideas. FWIW adding here, we discussed this in terms of
> >     Core Beam Transform Load Tests that we are working on right now [1].
> >     If generating synthetic data will be possible for portable streaming
> >     pipelines, we could use it in our work to test Python streaming
> >     scenarios.
> >
> >     [1] _https://s.apache.org/GVMa_
> >
> >     pt., 28 wrz 2018 o 08:18 Pablo Estrada <pabloem@google.com
> >     <ma...@google.com>> napisał(a):
> >
> >         Hi Thomas, all,
> >         yes, this is quite important for testing, and in fact I'd think
> >         it's important to streamline the insertion of native sources
> >         from different runners to make the current runners more usable.
> >         But that's another topic.
> >
> >         For generators of synthetic data, I had a couple ideas (and this
> >         will show my limited knowledge about Flink and Streaming, but oh
> >         well):
> >
> >         - Flink experts: Is it possible to add a pure-Beam generator
> >         that will do something like: Impulse -> ParDo(generate multiple
> >         elements) -> Forced "Write" to Flink (e.g. something like a
> >         reshuffle), and then have Flink manage the parallelism for
> >         stages downstream from that?
> >
> >         - If this is not possible, it may be worth writing some
> >         transform in Flink / other runners that can be plugged in by
> >         inserting a custom URN. In fact, it may be a good idea to
> >         streamline the insertion of native sources for each runner based
> >         on some sort of CustomURNTransform() ?
> >
> >         I hope I did not butcher those explanations too badly...
> >         Best
> >         -P.
> >
> >         On Thu, Sep 27, 2018, 5:55 PM Thomas Weise <thw@apache.org
> >         <ma...@apache.org>> wrote:
> >
> >             There were a few discussions how we can facilitate testing
> >             for portable streaming pipelines with the Flink runner. The
> >             problem is that we currently don't have streaming sources in
> >             the Python SDK.
> >
> >             One way to support testing could be a generator that extends
> >             the idea of Impulse to provide a Flink native trigger
> >             transform, optionally parameterized with an interval and max
> >             count.
> >
> >             Test pipelines could then follow the generator with a Map
> >             function that creates whatever payloads are desirable.
> >
> >             Thoughts?
> >
> >             Thanks,
> >             Thomas
> >
>

Re: Portable Flink runner: Generator source for testing

Posted by Maximilian Michels <mx...@apache.org>.
> and then have Flink manage the parallelism for stages downstream from that?@Pablo Can you clarify what you mean by that?

Let me paraphrase this just to get a clear understanding. There are two 
approaches to test portable streaming pipelines:

a) Use an Impulse followed by a test PTransform which generates testing 
data. This is similar to how streaming sources work which don't use the 
Read Transform. For basic testing this should work, even without support 
for Timers.

b) Introduce a new URN which gets translated to a native Flink/Spark/xy 
testing transform.

We should go for a) as this will make testing easier across portable 
runners. We previously discussed native transforms will be an option in 
Beam, but it would be preferable to leave them out of testing for now.

Thanks,
Max


On 28.09.18 21:14, Thomas Weise wrote:
> Thanks for sharing the link, this looks very promising!
> 
> For the synthetic source, if we need a runner native trigger mechanism, 
> then it should probably just emit an empty byte array like the impulse 
> implementation does, and everything else could be left to SDK specific 
> transforms that are downstream. We don't have support for timers in the 
> portable Flink runner yet. With timers, there would not be the need for 
> a runner native URN and it could work just like Pablo described.
> 
> 
> On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy <lukasz.gajowy@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Hi all,
> 
>     thank you, Thomas, for starting this discussion and Pablo for
>     sharing the ideas. FWIW adding here, we discussed this in terms of
>     Core Beam Transform Load Tests that we are working on right now [1].
>     If generating synthetic data will be possible for portable streaming
>     pipelines, we could use it in our work to test Python streaming
>     scenarios.
> 
>     [1] _https://s.apache.org/GVMa_
> 
>     pt., 28 wrz 2018 o 08:18 Pablo Estrada <pabloem@google.com
>     <ma...@google.com>> napisał(a):
> 
>         Hi Thomas, all,
>         yes, this is quite important for testing, and in fact I'd think
>         it's important to streamline the insertion of native sources
>         from different runners to make the current runners more usable.
>         But that's another topic.
> 
>         For generators of synthetic data, I had a couple ideas (and this
>         will show my limited knowledge about Flink and Streaming, but oh
>         well):
> 
>         - Flink experts: Is it possible to add a pure-Beam generator
>         that will do something like: Impulse -> ParDo(generate multiple
>         elements) -> Forced "Write" to Flink (e.g. something like a
>         reshuffle), and then have Flink manage the parallelism for
>         stages downstream from that?
> 
>         - If this is not possible, it may be worth writing some
>         transform in Flink / other runners that can be plugged in by
>         inserting a custom URN. In fact, it may be a good idea to
>         streamline the insertion of native sources for each runner based
>         on some sort of CustomURNTransform() ?
> 
>         I hope I did not butcher those explanations too badly...
>         Best
>         -P.
> 
>         On Thu, Sep 27, 2018, 5:55 PM Thomas Weise <thw@apache.org
>         <ma...@apache.org>> wrote:
> 
>             There were a few discussions how we can facilitate testing
>             for portable streaming pipelines with the Flink runner. The
>             problem is that we currently don't have streaming sources in
>             the Python SDK.
> 
>             One way to support testing could be a generator that extends
>             the idea of Impulse to provide a Flink native trigger
>             transform, optionally parameterized with an interval and max
>             count.
> 
>             Test pipelines could then follow the generator with a Map
>             function that creates whatever payloads are desirable.
> 
>             Thoughts?
> 
>             Thanks,
>             Thomas
> 

Re: Portable Flink runner: Generator source for testing

Posted by Thomas Weise <th...@apache.org>.
Thanks for sharing the link, this looks very promising!

For the synthetic source, if we need a runner native trigger mechanism,
then it should probably just emit an empty byte array like the impulse
implementation does, and everything else could be left to SDK specific
transforms that are downstream. We don't have support for timers in the
portable Flink runner yet. With timers, there would not be the need for a
runner native URN and it could work just like Pablo described.


On Fri, Sep 28, 2018 at 3:09 AM Łukasz Gajowy <lu...@gmail.com>
wrote:

> Hi all,
>
> thank you, Thomas, for starting this discussion and Pablo for sharing the
> ideas. FWIW adding here, we discussed this in terms of Core Beam Transform
> Load Tests that we are working on right now [1]. If generating synthetic
> data will be possible for portable streaming pipelines, we could use it in
> our work to test Python streaming scenarios.
>
> [1] *https://s.apache.org/GVMa <https://s.apache.org/GVMa>*
>
> pt., 28 wrz 2018 o 08:18 Pablo Estrada <pa...@google.com> napisał(a):
>
>> Hi Thomas, all,
>> yes, this is quite important for testing, and in fact I'd think it's
>> important to streamline the insertion of native sources from different
>> runners to make the current runners more usable. But that's another topic.
>>
>> For generators of synthetic data, I had a couple ideas (and this will
>> show my limited knowledge about Flink and Streaming, but oh well):
>>
>> - Flink experts: Is it possible to add a pure-Beam generator that will do
>> something like: Impulse -> ParDo(generate multiple elements) -> Forced
>> "Write" to Flink (e.g. something like a reshuffle), and then have Flink
>> manage the parallelism for stages downstream from that?
>>
>> - If this is not possible, it may be worth writing some transform in
>> Flink / other runners that can be plugged in by inserting a custom URN. In
>> fact, it may be a good idea to streamline the insertion of native sources
>> for each runner based on some sort of CustomURNTransform() ?
>>
>> I hope I did not butcher those explanations too badly...
>> Best
>> -P.
>>
>> On Thu, Sep 27, 2018, 5:55 PM Thomas Weise <th...@apache.org> wrote:
>>
>>> There were a few discussions how we can facilitate testing for portable
>>> streaming pipelines with the Flink runner. The problem is that we currently
>>> don't have streaming sources in the Python SDK.
>>>
>>> One way to support testing could be a generator that extends the idea of
>>> Impulse to provide a Flink native trigger transform, optionally
>>> parameterized with an interval and max count.
>>>
>>> Test pipelines could then follow the generator with a Map function that
>>> creates whatever payloads are desirable.
>>>
>>> Thoughts?
>>>
>>> Thanks,
>>> Thomas
>>>
>>>

Re: Portable Flink runner: Generator source for testing

Posted by Łukasz Gajowy <lu...@gmail.com>.
Hi all,

thank you, Thomas, for starting this discussion and Pablo for sharing the
ideas. FWIW adding here, we discussed this in terms of Core Beam Transform
Load Tests that we are working on right now [1]. If generating synthetic
data will be possible for portable streaming pipelines, we could use it in
our work to test Python streaming scenarios.

[1] *https://s.apache.org/GVMa <https://s.apache.org/GVMa>*

pt., 28 wrz 2018 o 08:18 Pablo Estrada <pa...@google.com> napisał(a):

> Hi Thomas, all,
> yes, this is quite important for testing, and in fact I'd think it's
> important to streamline the insertion of native sources from different
> runners to make the current runners more usable. But that's another topic.
>
> For generators of synthetic data, I had a couple ideas (and this will show
> my limited knowledge about Flink and Streaming, but oh well):
>
> - Flink experts: Is it possible to add a pure-Beam generator that will do
> something like: Impulse -> ParDo(generate multiple elements) -> Forced
> "Write" to Flink (e.g. something like a reshuffle), and then have Flink
> manage the parallelism for stages downstream from that?
>
> - If this is not possible, it may be worth writing some transform in Flink
> / other runners that can be plugged in by inserting a custom URN. In fact,
> it may be a good idea to streamline the insertion of native sources for
> each runner based on some sort of CustomURNTransform() ?
>
> I hope I did not butcher those explanations too badly...
> Best
> -P.
>
> On Thu, Sep 27, 2018, 5:55 PM Thomas Weise <th...@apache.org> wrote:
>
>> There were a few discussions how we can facilitate testing for portable
>> streaming pipelines with the Flink runner. The problem is that we currently
>> don't have streaming sources in the Python SDK.
>>
>> One way to support testing could be a generator that extends the idea of
>> Impulse to provide a Flink native trigger transform, optionally
>> parameterized with an interval and max count.
>>
>> Test pipelines could then follow the generator with a Map function that
>> creates whatever payloads are desirable.
>>
>> Thoughts?
>>
>> Thanks,
>> Thomas
>>
>>

Re: Portable Flink runner: Generator source for testing

Posted by Pablo Estrada <pa...@google.com>.
Hi Thomas, all,
yes, this is quite important for testing, and in fact I'd think it's
important to streamline the insertion of native sources from different
runners to make the current runners more usable. But that's another topic.

For generators of synthetic data, I had a couple ideas (and this will show
my limited knowledge about Flink and Streaming, but oh well):

- Flink experts: Is it possible to add a pure-Beam generator that will do
something like: Impulse -> ParDo(generate multiple elements) -> Forced
"Write" to Flink (e.g. something like a reshuffle), and then have Flink
manage the parallelism for stages downstream from that?

- If this is not possible, it may be worth writing some transform in Flink
/ other runners that can be plugged in by inserting a custom URN. In fact,
it may be a good idea to streamline the insertion of native sources for
each runner based on some sort of CustomURNTransform() ?

I hope I did not butcher those explanations too badly...
Best
-P.

On Thu, Sep 27, 2018, 5:55 PM Thomas Weise <th...@apache.org> wrote:

> There were a few discussions how we can facilitate testing for portable
> streaming pipelines with the Flink runner. The problem is that we currently
> don't have streaming sources in the Python SDK.
>
> One way to support testing could be a generator that extends the idea of
> Impulse to provide a Flink native trigger transform, optionally
> parameterized with an interval and max count.
>
> Test pipelines could then follow the generator with a Map function that
> creates whatever payloads are desirable.
>
> Thoughts?
>
> Thanks,
> Thomas
>
>