You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Joey Tran <jo...@schrodinger.com> on 2023/06/22 12:39:52 UTC

Getting Started With Implementing a Runner

Hi Beam community!

I'm interested in trying to implement a runner with my company's execution
environment but I'm struggling to get started. I've read the docs page
<https://beam.apache.org/contribute/runner-guide/#testing-your-runner> on
implementing a runner but it's quite high level. Anyone have any concrete
suggestions on getting started?

I've started by cloning and running the hello world example
<https://github.com/apache/beam-starter-python>. I've then subclassed `
PipelineRunner
<https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
to create my own custom runner but at this point I'm a bit stuck. My custom
runner just looks like

class CustomRunner(runner.PipelineRunner):
    def run_pipeline(self, pipeline,
                     options):
        self.visit_transforms(pipeline, options)

And when using it I get an error about not having implemented "Impulse"

NotImplementedError: Execution of [<Impulse(PTransform) label=[Impulse]>]
not implemented in runner <my_app.app.CustomRunner object at 0x135d9ff40>.

Am I going about this the right way? Are there tests I can run my custom
runner against to validate it beyond just running the hello world example?
I'm finding myself just digging through the beam source to try to piece
together how a runner works and I'm struggling to get a foothold. Any
guidance would be greatly appreciated, especially if anyone has any
experience implementing their own python runner.

Thanks in advance! Also, could I get a Slack invite?
Cheers,
Joey

Re: Getting Started With Implementing a Runner

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
I took a first pass at
https://github.com/apache/beam/blob/be19140f3e9194721f36e57f4a946adc6c43971a/website/www/site/content/en/contribute/runner-guide.md

https://github.com/apache/beam/blob/1cfc0fdc6ff27ad70365683fdc8264f42642f6e9/sdks/python/apache_beam/runners/trivial_runner.py
may also be of interest.

On Fri, Jul 21, 2023 at 7:25 AM Joey Tran <jo...@schrodinger.com> wrote:
>
> Could you let me know when you update it? I would be interested in rereading after the rewrite.
>
> Thanks!
> Joey
>
> On Fri, Jul 14, 2023 at 4:38 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> I'm taking an action item to update that page, as it is *way* out of date.
>>
>> On Thu, Jul 13, 2023 at 6:54 PM Joey Tran <jo...@schrodinger.com> wrote:
>>>
>>> I see. I guess I got a little confused since these are mentioned in the Authoring a Runner docs page which implied to me that they'd be safe to use. I'll check out the bundle_processor. Thanks!
>>>
>>> On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>
>>>> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <jo...@schrodinger.com> wrote:
>>>>>
>>>>> Working on this on and off now and getting some pretty good traction.
>>>>>
>>>>> One thing I'm a little worried about is all the classes that are marked "internal use only". A lot of these seem either very useful or possibly critical to writing a runner. How strictly should I interpret these private implementation labels?
>>>>>
>>>>> A few bits that I'm interested in using ordered by how surprised I was to find that they're internal only.
>>>>>
>>>>>  - apache_bean.pipeline.AppliedPTransform
>>>>>  - apache_beam.pipeline.PipelineVisitor
>>>>>  - apache_beam.runners.common.DoFnRunner
>>>>
>>>>
>>>> The public API is the protos. You should not have to interact with AppliedPTransform and PipelineVisitor directly (and while you can reach in and do so, there are no promises here and these are subject to change). As for DoFnRunner, if you're trying to reach in at this level you're probably going to have to be replicating a bunch of surrounding infrastructure as well. I would recommend using a BundleProcessor [1] to coordinate the work (which will internally wire up the chain of DoFns correctly and take them through their proper lifecycle). As mentioned above, you can directly borrow the translations in fn_api_runner to go from a full Pipeline graph (proto) to a set of fused DoFns to execute in topological order (as ProcessBundleDescriptor protos, which is what BundleProcessor accepts).
>>>>
>>>> [1] https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851
>>>>
>>>>>
>>>>> Thanks again for the help,
>>>>> Joey
>>>>>
>>>>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <ch...@google.com> wrote:
>>>>>>
>>>>>> Another advantage of a portable runner would be that it will be using well defined and backwards compatible Beam portable APIs to communicate with SDKs. I think this is specially important for runners that do not live in the Beam repo since otherwise future SDK releases could break your runner in subtle ways. Also portability gives you more flexibility when it comes to choosing an SDK to define the pipeline and will allow you to execute transforms in any SDK via cross-language.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <us...@beam.apache.org> wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <jo...@schrodinger.com> wrote:
>>>>>>>>>
>>>>>>>>> Totally doable by one person, especially given the limited feature set you mention above. https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE is a good starting point as to what the relationship between a Runner and the SDK is at a level of detail sufficient for implementation (told from the perspective of an SDK, but the story is largely about the interface which is directly applicable).
>>>>>>>>
>>>>>>>>
>>>>>>>> Great slides, I really appreciate the illustrations.
>>>>>>>>
>>>>>>>> I hadn't realized there was a concept of an "SDK Worker", I had imagined that once the Runner started execution of a workflow, it was Runner all the way down. Is the Fn API the only way to implement a runner? Our execution environment is a bit constrained in such a way that we can't expose the APIs required to implement the Fn API. To be forthright, we basically only have the ability to start a worker either with a known Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or with a bundle of data to process and return the outputs for. We're constrained from really any additional communication with a worker beyond that.
>>>>>>>
>>>>>>>
>>>>>>> The "worker" abstraction gives the ability to wrap any user code in a way that it can be called from any runner. If you're willing to constrain the code you're wrapping (e.g. "Python DoFns only") then this "worker" can be a logical, rather than physical, concept.
>>>>>>>
>>>>>>> Another way to look at it is that in practice, the "runner" often has its own notion of "workers" which wrap (often in a 1:1 way) the logical "SDK Worker" (which in turn invokes the actual DoFns). This latter may be inlined (e.g. if it's 100% Python on both sides). See, for example, https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>>>>
>>>>>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <jo...@schrodinger.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks all for the responses!
>>>>>>>>>>
>>>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Portable sounds great, but the answer depends on how much additional cost it'd require to implement portable over non-portable, even considering future deprecation (unless deprecation is happening tomorrow). I'm not familiar enough to know what the additional cost is so I don't have a firm answer.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I would way it would not be that expensive to write it in a "portable compatible" way (i.e it uses the publicly-documented protocol as the interface rather than reaching into internal details) even if it doesn't use GRCP and fire up separate processes/docker images for the workers (preferring to do tall of that inline like the Python portable direct runner does).
>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>>
>>>>>>>>>> I'd be developing this runner against the python SDK and if the runner only worked with the python SDK that'd be okay in the short term
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yes. And if you do it the above way, it should be easy to extend (or not) if/when the need arises.
>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Also, we don’t know if this new runner will be contributed back to Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>>>>
>>>>>>>>>> Likely won't be contributed back to Beam (not sure if it'd actually be useful to a wide audience anyways).
>>>>>>>>>>
>>>>>>>>>> The context is we've been developing an in-house large-scale pipeline framework that encapsulates both the programming model and the runner/execution of data workflows. As it's grown, I keep finding myself just reimplementing features and abstractions Beam has already implemented, so I wanted to explore adopting Beam. Our execution environment is very particular though and our workflows require it (due to the way we license our software), so my plan was to try to create a very basic runner that uses our execution environment. The runner could have very few features e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably introduce a shim for some of our internally implemented transforms and assess from there.
>>>>>>>>>>
>>>>>>>>>> Not sure if this is a lofty goal or not, so happy to hear your thoughts as to whether this seems reasonable and achievable without a large concerted effort or even if the general idea makes any sense. (I recognize that it might not be easy, but I don't have the resources to dedicate more than myself to work on a PoC)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Totally doable by one person, especially given the limited feature set you mention above. https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE is a good starting point as to what the relationship between a Runner and the SDK is at a level of detail sufficient for implementation (told from the perspective of an SDK, but the story is largely about the interface which is directly applicable).
>>>>>>>>>
>>>>>>>>> Given the limited feature set you proposed, this is similar to the original Python portable runner which took a week or two to put together (granted a lot has been added since then), or the typescript direct runner ( https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts ) which was done (in its basic form, no support for side inputs and such) in less than a week. Granted, as these are local runners, this illustrates only the Beam-side complexity of things (not the work of actually implementing a distributed shuffle, starting and assigning work to multiple workers, etc. but presumably that's the kind of thing your execution environment already takes care of.
>>>>>>>>>
>>>>>>>>> As for some more concrete pointers, you could probably leverage a lot of what's there by invoking create_stages
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>>>>>>>
>>>>>>>>> which will do optimization, fusion, etc. and then implementing your own version of run_stages
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>>>>>>>
>>>>>>>>> to execute these in topological order on your compute infrastructure. (If you're not doing streaming, this is much more straightforward than all the bundler scheduler stuff that currently exists in that code).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <ar...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <us...@beam.apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <ar...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The answer to this should be portable, as non-portable ones will be deprecated.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Well, actually this is a question that I don’t remember we discussed here in details before and had a common agreement.
>>>>>>>>>>>
>>>>>>>>>>> Actually, I’m not sure that I understand clearly what is meant by “deprecation" in this case. For example, Portable Spark Runner is heavily actually based on native Spark RDD runner and its translations. So, which part should be deprecated and what is a reason for that?
>>>>>>>>>>>
>>>>>>>>>>> Well, anyway I guess it’s off topic here.
>>>>>>>>>>>
>>>>>>>>>>> Also, we don’t know if this new runner will be contributed back to Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>>>>> So I agree that more details on this would be useful.
>>>>>>>>>>>
>>>>>>>>>>> —
>>>>>>>>>>> Alexey
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The answer to the above question makes this one moot :).
>>>>>>>>>>>
>>>>>>>>>>> On a more serious note, could you tell us a bit more about the runner you're looking at implementing?
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Then, depending on answers, I’d suggest to take as an example one of the most similar Beam runners and use it as a more detailed source of information along with Beam runner doc mentioned before.
>>>>>>>>>>>>
>>>>>>>>>>>> —
>>>>>>>>>>>> Alexey
>>>>>>>>>>>>
>>>>>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Beam community!
>>>>>>>>>>>>
>>>>>>>>>>>> I'm interested in trying to implement a runner with my company's execution environment but I'm struggling to get started. I've read the docs page on implementing a runner but it's quite high level. Anyone have any concrete suggestions on getting started?
>>>>>>>>>>>>
>>>>>>>>>>>> I've started by cloning and running the hello world example. I've then subclassed `PipelineRunner` to create my own custom runner but at this point I'm a bit stuck. My custom runner just looks like
>>>>>>>>>>>>
>>>>>>>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>>>>>>>     def run_pipeline(self, pipeline,
>>>>>>>>>>>>                      options):
>>>>>>>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>>>>>>>
>>>>>>>>>>>> And when using it I get an error about not having implemented "Impulse"
>>>>>>>>>>>>
>>>>>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform) label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object at 0x135d9ff40>.
>>>>>>>>>>>>
>>>>>>>>>>>> Am I going about this the right way? Are there tests I can run my custom runner against to validate it beyond just running the hello world example? I'm finding myself just digging through the beam source to try to piece together how a runner works and I'm struggling to get a foothold. Any guidance would be greatly appreciated, especially if anyone has any experience implementing their own python runner.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Joey
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>

Re: Getting Started With Implementing a Runner

Posted by Joey Tran <jo...@schrodinger.com>.
Could you let me know when you update it? I would be interested in
rereading after the rewrite.

Thanks!
Joey

On Fri, Jul 14, 2023 at 4:38 PM Robert Bradshaw <ro...@google.com> wrote:

> I'm taking an action item to update that page, as it is *way* out of date.
>
> On Thu, Jul 13, 2023 at 6:54 PM Joey Tran <jo...@schrodinger.com>
> wrote:
>
>> I see. I guess I got a little confused since these are mentioned in the Authoring
>> a Runner
>> <https://beam.apache.org/contribute/runner-guide/#the-runner-api-protos> docs
>> page which implied to me that they'd be safe to use. I'll check out the
>> bundle_processor. Thanks!
>>
>> On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <jo...@schrodinger.com>
>>> wrote:
>>>
>>>> Working on this on and off now and getting some pretty good traction.
>>>>
>>>> One thing I'm a little worried about is all the classes that are marked
>>>> "internal use only". A lot of these seem either very useful or possibly
>>>> critical to writing a runner. How strictly should I interpret these private
>>>> implementation labels?
>>>>
>>>> A few bits that I'm interested in using ordered by how surprised I was
>>>> to find that they're internal only.
>>>>
>>>>  - apache_bean.pipeline.AppliedPTransform
>>>>  - apache_beam.pipeline.PipelineVisitor
>>>>  - apache_beam.runners.common.DoFnRunner
>>>>
>>>
>>> The public API is the protos. You should not have to interact
>>> with AppliedPTransform and PipelineVisitor directly (and while you can
>>> reach in and do so, there are no promises here and these are subject to
>>> change). As for DoFnRunner, if you're trying to reach in at this level
>>> you're probably going to have to be replicating a bunch of surrounding
>>> infrastructure as well. I would recommend using a BundleProcessor [1] to
>>> coordinate the work (which will internally wire up the chain of DoFns
>>> correctly and take them through their proper lifecycle). As mentioned
>>> above, you can directly borrow the translations in fn_api_runner to go from
>>> a full Pipeline graph (proto) to a set of fused DoFns to execute in
>>> topological order (as ProcessBundleDescriptor protos, which is
>>> what BundleProcessor accepts).
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851
>>>
>>>
>>>> Thanks again for the help,
>>>> Joey
>>>>
>>>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> Another advantage of a portable runner would be that it will be using
>>>>> well defined and backwards compatible Beam portable APIs to communicate
>>>>> with SDKs. I think this is specially important for runners that do not live
>>>>> in the Beam repo since otherwise future SDK releases could break your
>>>>> runner in subtle ways. Also portability gives you more flexibility when it
>>>>> comes to choosing an SDK to define the pipeline and will allow you to
>>>>> execute transforms in any SDK via cross-language.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <jo...@schrodinger.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Totally doable by one person, especially given the limited feature
>>>>>>>> set you mention above.
>>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE is
>>>>>>>> a good starting point as to what the relationship between a Runner and the
>>>>>>>> SDK is at a level of detail sufficient for implementation (told from the
>>>>>>>> perspective of an SDK, but the story is largely about the interface which
>>>>>>>> is directly applicable).
>>>>>>>
>>>>>>>
>>>>>>> Great slides, I really appreciate the illustrations.
>>>>>>>
>>>>>>> I hadn't realized there was a concept of an "SDK Worker", I had
>>>>>>> imagined that once the Runner started execution of a workflow, it was
>>>>>>> Runner all the way down. Is the Fn API the only way to implement a runner?
>>>>>>> Our execution environment is a bit constrained in such a way that we can't
>>>>>>> expose the APIs required to implement the Fn API. To be forthright, we
>>>>>>> basically only have the ability to start a worker either with a known
>>>>>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or with
>>>>>>> a bundle of data to process and return the outputs for. We're constrained
>>>>>>> from really any additional communication with a worker beyond that.
>>>>>>>
>>>>>>
>>>>>> The "worker" abstraction gives the ability to wrap any user code in a
>>>>>> way that it can be called from any runner. If you're willing to constrain
>>>>>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" can
>>>>>> be a logical, rather than physical, concept.
>>>>>>
>>>>>> Another way to look at it is that in practice, the "runner" often has
>>>>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical
>>>>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be
>>>>>> inlined (e.g. if it's 100% Python on both sides). See, for example,
>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>>>>>>
>>>>>>
>>>>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <ro...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <
>>>>>>>> joey.tran@schrodinger.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks all for the responses!
>>>>>>>>>
>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you,
>>>>>>>>>> then, at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Portable sounds great, but the answer depends on how much
>>>>>>>>> additional cost it'd require to implement portable over non-portable, even
>>>>>>>>> considering future deprecation (unless deprecation is happening tomorrow).
>>>>>>>>> I'm not familiar enough to know what the additional cost is so I don't have
>>>>>>>>> a firm answer.
>>>>>>>>>
>>>>>>>>
>>>>>>>> I would way it would not be that expensive to write it in a
>>>>>>>> "portable compatible" way (i.e it uses the publicly-documented protocol as
>>>>>>>> the interface rather than reaching into internal details) even if it
>>>>>>>> doesn't use GRCP and fire up separate processes/docker images for the
>>>>>>>> workers (preferring to do tall of that inline like the Python portable
>>>>>>>> direct runner does).
>>>>>>>>
>>>>>>>>
>>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>>
>>>>>>>>> I'd be developing this runner against the python SDK and if the
>>>>>>>>> runner only worked with the python SDK that'd be okay in the short term
>>>>>>>>>
>>>>>>>>
>>>>>>>> Yes. And if you do it the above way, it should be easy to extend
>>>>>>>> (or not) if/when the need arises.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>>>
>>>>>>>>> Likely won't be contributed back to Beam (not sure if it'd
>>>>>>>>> actually be useful to a wide audience anyways).
>>>>>>>>>
>>>>>>>>> The context is we've been developing an in-house large-scale
>>>>>>>>> pipeline framework that encapsulates both the programming model and the
>>>>>>>>> runner/execution of data workflows. As it's grown, I keep finding myself
>>>>>>>>> just reimplementing features and abstractions Beam has already implemented,
>>>>>>>>> so I wanted to explore adopting Beam. Our execution environment is very
>>>>>>>>> particular though and our workflows require it (due to the way we license
>>>>>>>>> our software), so my plan was to try to create a very basic runner that
>>>>>>>>> uses our execution environment. The runner could have very few features
>>>>>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
>>>>>>>>> introduce a shim for some of our internally implemented transforms and
>>>>>>>>> assess from there.
>>>>>>>>>
>>>>>>>>> Not sure if this is a lofty goal or not, so happy to hear your
>>>>>>>>> thoughts as to whether this seems reasonable and achievable without a large
>>>>>>>>> concerted effort or even if the general idea makes any sense. (I recognize
>>>>>>>>> that it might not be *easy*, but I don't have the resources to
>>>>>>>>> dedicate more than myself to work on a PoC)
>>>>>>>>>
>>>>>>>>
>>>>>>>> Totally doable by one person, especially given the limited feature
>>>>>>>> set you mention above.
>>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>>>> is a good starting point as to what the relationship between a Runner and
>>>>>>>> the SDK is at a level of detail sufficient for implementation (told from
>>>>>>>> the perspective of an SDK, but the story is largely about the interface
>>>>>>>> which is directly applicable).
>>>>>>>>
>>>>>>>> Given the limited feature set you proposed, this is similar to the
>>>>>>>> original Python portable runner which took a week or two to put together
>>>>>>>> (granted a lot has been added since then), or the typescript direct runner
>>>>>>>> (
>>>>>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>>>>>>>> ) which was done (in its basic form, no support for side inputs and such)
>>>>>>>> in less than a week. Granted, as these are local runners, this illustrates
>>>>>>>> only the Beam-side complexity of things (not the work of actually
>>>>>>>> implementing a distributed shuffle, starting and assigning work to multiple
>>>>>>>> workers, etc. but presumably that's the kind of thing your execution
>>>>>>>> environment already takes care of.
>>>>>>>>
>>>>>>>> As for some more concrete pointers, you could probably leverage a
>>>>>>>> lot of what's there by invoking create_stages
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>>>>>>
>>>>>>>> which will do optimization, fusion, etc. and then implementing your
>>>>>>>> own version of run_stages
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>>>>>>
>>>>>>>> to execute these in topological order on your compute
>>>>>>>> infrastructure. (If you're not doing streaming, this is much more
>>>>>>>> straightforward than all the bundler scheduler stuff that currently exists
>>>>>>>> in that code).
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you,
>>>>>>>>>>> then, at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> The answer to this should be portable, as non-portable ones will
>>>>>>>>>> be deprecated.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Well, actually this is a question that I don’t remember we
>>>>>>>>>> discussed here in details before and had a common agreement.
>>>>>>>>>>
>>>>>>>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>>>>>>>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>>>>>>>>>> actually based on native Spark RDD runner and its translations. So, which
>>>>>>>>>> part should be deprecated and what is a reason for that?
>>>>>>>>>>
>>>>>>>>>> Well, anyway I guess it’s off topic here.
>>>>>>>>>>
>>>>>>>>>> Also, we don’t know if this new runner will be contributed back
>>>>>>>>>> to Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>>>> So I agree that more details on this would be useful.
>>>>>>>>>>
>>>>>>>>>> —
>>>>>>>>>> Alexey
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> The answer to the above question makes this one moot :).
>>>>>>>>>>
>>>>>>>>>> On a more serious note, could you tell us a bit more about the
>>>>>>>>>> runner you're looking at implementing?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Then, depending on answers, I’d suggest to take as an example
>>>>>>>>>>> one of the most similar Beam runners and use it as a more detailed source
>>>>>>>>>>> of information along with Beam runner doc mentioned before.
>>>>>>>>>>>
>>>>>>>>>>> —
>>>>>>>>>>> Alexey
>>>>>>>>>>>
>>>>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi Beam community!
>>>>>>>>>>>
>>>>>>>>>>> I'm interested in trying to implement a runner with my company's
>>>>>>>>>>> execution environment but I'm struggling to get started. I've read the docs
>>>>>>>>>>> page
>>>>>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>>>>>>>>> on implementing a runner but it's quite high level. Anyone have any
>>>>>>>>>>> concrete suggestions on getting started?
>>>>>>>>>>>
>>>>>>>>>>> I've started by cloning and running the hello world example
>>>>>>>>>>> <https://github.com/apache/beam-starter-python>. I've then
>>>>>>>>>>> subclassed `PipelineRunner
>>>>>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>>>>>>>>> to create my own custom runner but at this point I'm a bit stuck. My custom
>>>>>>>>>>> runner just looks like
>>>>>>>>>>>
>>>>>>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>>>>>>     def run_pipeline(self, pipeline,
>>>>>>>>>>>                      options):
>>>>>>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>>>>>>
>>>>>>>>>>> And when using it I get an error about not having implemented
>>>>>>>>>>> "Impulse"
>>>>>>>>>>>
>>>>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>>>>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object
>>>>>>>>>>> at 0x135d9ff40>.
>>>>>>>>>>>
>>>>>>>>>>> Am I going about this the right way? Are there tests I can run
>>>>>>>>>>> my custom runner against to validate it beyond just running the hello world
>>>>>>>>>>> example? I'm finding myself just digging through the beam source to try to
>>>>>>>>>>> piece together how a runner works and I'm struggling to get a foothold. Any
>>>>>>>>>>> guidance would be greatly appreciated, especially if anyone has any
>>>>>>>>>>> experience implementing their own python runner.
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Joey
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>

Re: Getting Started With Implementing a Runner

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
I'm taking an action item to update that page, as it is *way* out of date.

On Thu, Jul 13, 2023 at 6:54 PM Joey Tran <jo...@schrodinger.com> wrote:

> I see. I guess I got a little confused since these are mentioned in the Authoring
> a Runner
> <https://beam.apache.org/contribute/runner-guide/#the-runner-api-protos> docs
> page which implied to me that they'd be safe to use. I'll check out the
> bundle_processor. Thanks!
>
> On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <jo...@schrodinger.com>
>> wrote:
>>
>>> Working on this on and off now and getting some pretty good traction.
>>>
>>> One thing I'm a little worried about is all the classes that are marked
>>> "internal use only". A lot of these seem either very useful or possibly
>>> critical to writing a runner. How strictly should I interpret these private
>>> implementation labels?
>>>
>>> A few bits that I'm interested in using ordered by how surprised I was
>>> to find that they're internal only.
>>>
>>>  - apache_bean.pipeline.AppliedPTransform
>>>  - apache_beam.pipeline.PipelineVisitor
>>>  - apache_beam.runners.common.DoFnRunner
>>>
>>
>> The public API is the protos. You should not have to interact
>> with AppliedPTransform and PipelineVisitor directly (and while you can
>> reach in and do so, there are no promises here and these are subject to
>> change). As for DoFnRunner, if you're trying to reach in at this level
>> you're probably going to have to be replicating a bunch of surrounding
>> infrastructure as well. I would recommend using a BundleProcessor [1] to
>> coordinate the work (which will internally wire up the chain of DoFns
>> correctly and take them through their proper lifecycle). As mentioned
>> above, you can directly borrow the translations in fn_api_runner to go from
>> a full Pipeline graph (proto) to a set of fused DoFns to execute in
>> topological order (as ProcessBundleDescriptor protos, which is
>> what BundleProcessor accepts).
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851
>>
>>
>>> Thanks again for the help,
>>> Joey
>>>
>>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> Another advantage of a portable runner would be that it will be using
>>>> well defined and backwards compatible Beam portable APIs to communicate
>>>> with SDKs. I think this is specially important for runners that do not live
>>>> in the Beam repo since otherwise future SDK releases could break your
>>>> runner in subtle ways. Also portability gives you more flexibility when it
>>>> comes to choosing an SDK to define the pipeline and will allow you to
>>>> execute transforms in any SDK via cross-language.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <jo...@schrodinger.com>
>>>>> wrote:
>>>>>
>>>>>> Totally doable by one person, especially given the limited feature
>>>>>>> set you mention above.
>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE is
>>>>>>> a good starting point as to what the relationship between a Runner and the
>>>>>>> SDK is at a level of detail sufficient for implementation (told from the
>>>>>>> perspective of an SDK, but the story is largely about the interface which
>>>>>>> is directly applicable).
>>>>>>
>>>>>>
>>>>>> Great slides, I really appreciate the illustrations.
>>>>>>
>>>>>> I hadn't realized there was a concept of an "SDK Worker", I had
>>>>>> imagined that once the Runner started execution of a workflow, it was
>>>>>> Runner all the way down. Is the Fn API the only way to implement a runner?
>>>>>> Our execution environment is a bit constrained in such a way that we can't
>>>>>> expose the APIs required to implement the Fn API. To be forthright, we
>>>>>> basically only have the ability to start a worker either with a known
>>>>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or with
>>>>>> a bundle of data to process and return the outputs for. We're constrained
>>>>>> from really any additional communication with a worker beyond that.
>>>>>>
>>>>>
>>>>> The "worker" abstraction gives the ability to wrap any user code in a
>>>>> way that it can be called from any runner. If you're willing to constrain
>>>>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" can
>>>>> be a logical, rather than physical, concept.
>>>>>
>>>>> Another way to look at it is that in practice, the "runner" often has
>>>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical
>>>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be
>>>>> inlined (e.g. if it's 100% Python on both sides). See, for example,
>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>>>>>
>>>>>
>>>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <
>>>>>>> joey.tran@schrodinger.com> wrote:
>>>>>>>
>>>>>>>> Thanks all for the responses!
>>>>>>>>
>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then,
>>>>>>>>> at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>
>>>>>>>>
>>>>>>>> Portable sounds great, but the answer depends on how much
>>>>>>>> additional cost it'd require to implement portable over non-portable, even
>>>>>>>> considering future deprecation (unless deprecation is happening tomorrow).
>>>>>>>> I'm not familiar enough to know what the additional cost is so I don't have
>>>>>>>> a firm answer.
>>>>>>>>
>>>>>>>
>>>>>>> I would way it would not be that expensive to write it in a
>>>>>>> "portable compatible" way (i.e it uses the publicly-documented protocol as
>>>>>>> the interface rather than reaching into internal details) even if it
>>>>>>> doesn't use GRCP and fire up separate processes/docker images for the
>>>>>>> workers (preferring to do tall of that inline like the Python portable
>>>>>>> direct runner does).
>>>>>>>
>>>>>>>
>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>
>>>>>>>> I'd be developing this runner against the python SDK and if the
>>>>>>>> runner only worked with the python SDK that'd be okay in the short term
>>>>>>>>
>>>>>>>
>>>>>>> Yes. And if you do it the above way, it should be easy to extend (or
>>>>>>> not) if/when the need arises.
>>>>>>>
>>>>>>>
>>>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>>
>>>>>>>> Likely won't be contributed back to Beam (not sure if it'd actually
>>>>>>>> be useful to a wide audience anyways).
>>>>>>>>
>>>>>>>> The context is we've been developing an in-house large-scale
>>>>>>>> pipeline framework that encapsulates both the programming model and the
>>>>>>>> runner/execution of data workflows. As it's grown, I keep finding myself
>>>>>>>> just reimplementing features and abstractions Beam has already implemented,
>>>>>>>> so I wanted to explore adopting Beam. Our execution environment is very
>>>>>>>> particular though and our workflows require it (due to the way we license
>>>>>>>> our software), so my plan was to try to create a very basic runner that
>>>>>>>> uses our execution environment. The runner could have very few features
>>>>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
>>>>>>>> introduce a shim for some of our internally implemented transforms and
>>>>>>>> assess from there.
>>>>>>>>
>>>>>>>> Not sure if this is a lofty goal or not, so happy to hear your
>>>>>>>> thoughts as to whether this seems reasonable and achievable without a large
>>>>>>>> concerted effort or even if the general idea makes any sense. (I recognize
>>>>>>>> that it might not be *easy*, but I don't have the resources to
>>>>>>>> dedicate more than myself to work on a PoC)
>>>>>>>>
>>>>>>>
>>>>>>> Totally doable by one person, especially given the limited feature
>>>>>>> set you mention above.
>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>>> is a good starting point as to what the relationship between a Runner and
>>>>>>> the SDK is at a level of detail sufficient for implementation (told from
>>>>>>> the perspective of an SDK, but the story is largely about the interface
>>>>>>> which is directly applicable).
>>>>>>>
>>>>>>> Given the limited feature set you proposed, this is similar to the
>>>>>>> original Python portable runner which took a week or two to put together
>>>>>>> (granted a lot has been added since then), or the typescript direct runner
>>>>>>> (
>>>>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>>>>>>> ) which was done (in its basic form, no support for side inputs and such)
>>>>>>> in less than a week. Granted, as these are local runners, this illustrates
>>>>>>> only the Beam-side complexity of things (not the work of actually
>>>>>>> implementing a distributed shuffle, starting and assigning work to multiple
>>>>>>> workers, etc. but presumably that's the kind of thing your execution
>>>>>>> environment already takes care of.
>>>>>>>
>>>>>>> As for some more concrete pointers, you could probably leverage a
>>>>>>> lot of what's there by invoking create_stages
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>>>>>
>>>>>>> which will do optimization, fusion, etc. and then implementing your
>>>>>>> own version of run_stages
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>>>>>
>>>>>>> to execute these in topological order on your compute
>>>>>>> infrastructure. (If you're not doing streaming, this is much more
>>>>>>> straightforward than all the bundler scheduler stuff that currently exists
>>>>>>> in that code).
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you,
>>>>>>>>>> then, at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The answer to this should be portable, as non-portable ones will
>>>>>>>>> be deprecated.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Well, actually this is a question that I don’t remember we
>>>>>>>>> discussed here in details before and had a common agreement.
>>>>>>>>>
>>>>>>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>>>>>>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>>>>>>>>> actually based on native Spark RDD runner and its translations. So, which
>>>>>>>>> part should be deprecated and what is a reason for that?
>>>>>>>>>
>>>>>>>>> Well, anyway I guess it’s off topic here.
>>>>>>>>>
>>>>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>>> So I agree that more details on this would be useful.
>>>>>>>>>
>>>>>>>>> —
>>>>>>>>> Alexey
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The answer to the above question makes this one moot :).
>>>>>>>>>
>>>>>>>>> On a more serious note, could you tell us a bit more about the
>>>>>>>>> runner you're looking at implementing?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Then, depending on answers, I’d suggest to take as an example one
>>>>>>>>>> of the most similar Beam runners and use it as a more detailed source of
>>>>>>>>>> information along with Beam runner doc mentioned before.
>>>>>>>>>>
>>>>>>>>>> —
>>>>>>>>>> Alexey
>>>>>>>>>>
>>>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Beam community!
>>>>>>>>>>
>>>>>>>>>> I'm interested in trying to implement a runner with my company's
>>>>>>>>>> execution environment but I'm struggling to get started. I've read the docs
>>>>>>>>>> page
>>>>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>>>>>>>> on implementing a runner but it's quite high level. Anyone have any
>>>>>>>>>> concrete suggestions on getting started?
>>>>>>>>>>
>>>>>>>>>> I've started by cloning and running the hello world example
>>>>>>>>>> <https://github.com/apache/beam-starter-python>. I've then
>>>>>>>>>> subclassed `PipelineRunner
>>>>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>>>>>>>> to create my own custom runner but at this point I'm a bit stuck. My custom
>>>>>>>>>> runner just looks like
>>>>>>>>>>
>>>>>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>>>>>     def run_pipeline(self, pipeline,
>>>>>>>>>>                      options):
>>>>>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>>>>>
>>>>>>>>>> And when using it I get an error about not having implemented
>>>>>>>>>> "Impulse"
>>>>>>>>>>
>>>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>>>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object
>>>>>>>>>> at 0x135d9ff40>.
>>>>>>>>>>
>>>>>>>>>> Am I going about this the right way? Are there tests I can run my
>>>>>>>>>> custom runner against to validate it beyond just running the hello world
>>>>>>>>>> example? I'm finding myself just digging through the beam source to try to
>>>>>>>>>> piece together how a runner works and I'm struggling to get a foothold. Any
>>>>>>>>>> guidance would be greatly appreciated, especially if anyone has any
>>>>>>>>>> experience implementing their own python runner.
>>>>>>>>>>
>>>>>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>>>>>> Cheers,
>>>>>>>>>> Joey
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>

Re: Getting Started With Implementing a Runner

Posted by Joey Tran <jo...@schrodinger.com>.
I see. I guess I got a little confused since these are mentioned in
the Authoring
a Runner
<https://beam.apache.org/contribute/runner-guide/#the-runner-api-protos> docs
page which implied to me that they'd be safe to use. I'll check out the
bundle_processor. Thanks!

On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw <ro...@google.com> wrote:

> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <jo...@schrodinger.com>
> wrote:
>
>> Working on this on and off now and getting some pretty good traction.
>>
>> One thing I'm a little worried about is all the classes that are marked
>> "internal use only". A lot of these seem either very useful or possibly
>> critical to writing a runner. How strictly should I interpret these private
>> implementation labels?
>>
>> A few bits that I'm interested in using ordered by how surprised I was to
>> find that they're internal only.
>>
>>  - apache_bean.pipeline.AppliedPTransform
>>  - apache_beam.pipeline.PipelineVisitor
>>  - apache_beam.runners.common.DoFnRunner
>>
>
> The public API is the protos. You should not have to interact
> with AppliedPTransform and PipelineVisitor directly (and while you can
> reach in and do so, there are no promises here and these are subject to
> change). As for DoFnRunner, if you're trying to reach in at this level
> you're probably going to have to be replicating a bunch of surrounding
> infrastructure as well. I would recommend using a BundleProcessor [1] to
> coordinate the work (which will internally wire up the chain of DoFns
> correctly and take them through their proper lifecycle). As mentioned
> above, you can directly borrow the translations in fn_api_runner to go from
> a full Pipeline graph (proto) to a set of fused DoFns to execute in
> topological order (as ProcessBundleDescriptor protos, which is
> what BundleProcessor accepts).
>
> [1]
> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851
>
>
>> Thanks again for the help,
>> Joey
>>
>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> Another advantage of a portable runner would be that it will be using
>>> well defined and backwards compatible Beam portable APIs to communicate
>>> with SDKs. I think this is specially important for runners that do not live
>>> in the Beam repo since otherwise future SDK releases could break your
>>> runner in subtle ways. Also portability gives you more flexibility when it
>>> comes to choosing an SDK to define the pipeline and will allow you to
>>> execute transforms in any SDK via cross-language.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
>>> user@beam.apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <jo...@schrodinger.com>
>>>> wrote:
>>>>
>>>>> Totally doable by one person, especially given the limited feature set
>>>>>> you mention above.
>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE is
>>>>>> a good starting point as to what the relationship between a Runner and the
>>>>>> SDK is at a level of detail sufficient for implementation (told from the
>>>>>> perspective of an SDK, but the story is largely about the interface which
>>>>>> is directly applicable).
>>>>>
>>>>>
>>>>> Great slides, I really appreciate the illustrations.
>>>>>
>>>>> I hadn't realized there was a concept of an "SDK Worker", I had
>>>>> imagined that once the Runner started execution of a workflow, it was
>>>>> Runner all the way down. Is the Fn API the only way to implement a runner?
>>>>> Our execution environment is a bit constrained in such a way that we can't
>>>>> expose the APIs required to implement the Fn API. To be forthright, we
>>>>> basically only have the ability to start a worker either with a known
>>>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or with
>>>>> a bundle of data to process and return the outputs for. We're constrained
>>>>> from really any additional communication with a worker beyond that.
>>>>>
>>>>
>>>> The "worker" abstraction gives the ability to wrap any user code in a
>>>> way that it can be called from any runner. If you're willing to constrain
>>>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" can
>>>> be a logical, rather than physical, concept.
>>>>
>>>> Another way to look at it is that in practice, the "runner" often has
>>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical
>>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be
>>>> inlined (e.g. if it's 100% Python on both sides). See, for example,
>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>>>>
>>>>
>>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <jo...@schrodinger.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks all for the responses!
>>>>>>>
>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then,
>>>>>>>> at fist, I’d suggest to answer two questions for yourself:
>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>
>>>>>>>
>>>>>>> Portable sounds great, but the answer depends on how much additional
>>>>>>> cost it'd require to implement portable over non-portable, even considering
>>>>>>> future deprecation (unless deprecation is happening tomorrow). I'm not
>>>>>>> familiar enough to know what the additional cost is so I don't have a firm
>>>>>>> answer.
>>>>>>>
>>>>>>
>>>>>> I would way it would not be that expensive to write it in a "portable
>>>>>> compatible" way (i.e it uses the publicly-documented protocol as the
>>>>>> interface rather than reaching into internal details) even if it doesn't
>>>>>> use GRCP and fire up separate processes/docker images for the workers
>>>>>> (preferring to do tall of that inline like the Python portable direct
>>>>>> runner does).
>>>>>>
>>>>>>
>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>
>>>>>>> I'd be developing this runner against the python SDK and if the
>>>>>>> runner only worked with the python SDK that'd be okay in the short term
>>>>>>>
>>>>>>
>>>>>> Yes. And if you do it the above way, it should be easy to extend (or
>>>>>> not) if/when the need arises.
>>>>>>
>>>>>>
>>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>
>>>>>>> Likely won't be contributed back to Beam (not sure if it'd actually
>>>>>>> be useful to a wide audience anyways).
>>>>>>>
>>>>>>> The context is we've been developing an in-house large-scale
>>>>>>> pipeline framework that encapsulates both the programming model and the
>>>>>>> runner/execution of data workflows. As it's grown, I keep finding myself
>>>>>>> just reimplementing features and abstractions Beam has already implemented,
>>>>>>> so I wanted to explore adopting Beam. Our execution environment is very
>>>>>>> particular though and our workflows require it (due to the way we license
>>>>>>> our software), so my plan was to try to create a very basic runner that
>>>>>>> uses our execution environment. The runner could have very few features
>>>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
>>>>>>> introduce a shim for some of our internally implemented transforms and
>>>>>>> assess from there.
>>>>>>>
>>>>>>> Not sure if this is a lofty goal or not, so happy to hear your
>>>>>>> thoughts as to whether this seems reasonable and achievable without a large
>>>>>>> concerted effort or even if the general idea makes any sense. (I recognize
>>>>>>> that it might not be *easy*, but I don't have the resources to
>>>>>>> dedicate more than myself to work on a PoC)
>>>>>>>
>>>>>>
>>>>>> Totally doable by one person, especially given the limited feature
>>>>>> set you mention above.
>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>> is a good starting point as to what the relationship between a Runner and
>>>>>> the SDK is at a level of detail sufficient for implementation (told from
>>>>>> the perspective of an SDK, but the story is largely about the interface
>>>>>> which is directly applicable).
>>>>>>
>>>>>> Given the limited feature set you proposed, this is similar to the
>>>>>> original Python portable runner which took a week or two to put together
>>>>>> (granted a lot has been added since then), or the typescript direct runner
>>>>>> (
>>>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>>>>>> ) which was done (in its basic form, no support for side inputs and such)
>>>>>> in less than a week. Granted, as these are local runners, this illustrates
>>>>>> only the Beam-side complexity of things (not the work of actually
>>>>>> implementing a distributed shuffle, starting and assigning work to multiple
>>>>>> workers, etc. but presumably that's the kind of thing your execution
>>>>>> environment already takes care of.
>>>>>>
>>>>>> As for some more concrete pointers, you could probably leverage a lot
>>>>>> of what's there by invoking create_stages
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>>>>
>>>>>> which will do optimization, fusion, etc. and then implementing your
>>>>>> own version of run_stages
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>>>>
>>>>>> to execute these in topological order on your compute infrastructure.
>>>>>> (If you're not doing streaming, this is much more straightforward than all
>>>>>> the bundler scheduler stuff that currently exists in that code).
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then,
>>>>>>>>> at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>
>>>>>>>>
>>>>>>>> The answer to this should be portable, as non-portable ones will be
>>>>>>>> deprecated.
>>>>>>>>
>>>>>>>>
>>>>>>>> Well, actually this is a question that I don’t remember we
>>>>>>>> discussed here in details before and had a common agreement.
>>>>>>>>
>>>>>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>>>>>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>>>>>>>> actually based on native Spark RDD runner and its translations. So, which
>>>>>>>> part should be deprecated and what is a reason for that?
>>>>>>>>
>>>>>>>> Well, anyway I guess it’s off topic here.
>>>>>>>>
>>>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>> So I agree that more details on this would be useful.
>>>>>>>>
>>>>>>>> —
>>>>>>>> Alexey
>>>>>>>>
>>>>>>>>
>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>
>>>>>>>>
>>>>>>>> The answer to the above question makes this one moot :).
>>>>>>>>
>>>>>>>> On a more serious note, could you tell us a bit more about the
>>>>>>>> runner you're looking at implementing?
>>>>>>>>
>>>>>>>>
>>>>>>>>> Then, depending on answers, I’d suggest to take as an example one
>>>>>>>>> of the most similar Beam runners and use it as a more detailed source of
>>>>>>>>> information along with Beam runner doc mentioned before.
>>>>>>>>>
>>>>>>>>> —
>>>>>>>>> Alexey
>>>>>>>>>
>>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Beam community!
>>>>>>>>>
>>>>>>>>> I'm interested in trying to implement a runner with my company's
>>>>>>>>> execution environment but I'm struggling to get started. I've read the docs
>>>>>>>>> page
>>>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>>>>>>> on implementing a runner but it's quite high level. Anyone have any
>>>>>>>>> concrete suggestions on getting started?
>>>>>>>>>
>>>>>>>>> I've started by cloning and running the hello world example
>>>>>>>>> <https://github.com/apache/beam-starter-python>. I've then
>>>>>>>>> subclassed `PipelineRunner
>>>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>>>>>>> to create my own custom runner but at this point I'm a bit stuck. My custom
>>>>>>>>> runner just looks like
>>>>>>>>>
>>>>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>>>>     def run_pipeline(self, pipeline,
>>>>>>>>>                      options):
>>>>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>>>>
>>>>>>>>> And when using it I get an error about not having implemented
>>>>>>>>> "Impulse"
>>>>>>>>>
>>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object
>>>>>>>>> at 0x135d9ff40>.
>>>>>>>>>
>>>>>>>>> Am I going about this the right way? Are there tests I can run my
>>>>>>>>> custom runner against to validate it beyond just running the hello world
>>>>>>>>> example? I'm finding myself just digging through the beam source to try to
>>>>>>>>> piece together how a runner works and I'm struggling to get a foothold. Any
>>>>>>>>> guidance would be greatly appreciated, especially if anyone has any
>>>>>>>>> experience implementing their own python runner.
>>>>>>>>>
>>>>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>>>>> Cheers,
>>>>>>>>> Joey
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>

Re: Getting Started With Implementing a Runner

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <jo...@schrodinger.com> wrote:

> Working on this on and off now and getting some pretty good traction.
>
> One thing I'm a little worried about is all the classes that are marked
> "internal use only". A lot of these seem either very useful or possibly
> critical to writing a runner. How strictly should I interpret these private
> implementation labels?
>
> A few bits that I'm interested in using ordered by how surprised I was to
> find that they're internal only.
>
>  - apache_bean.pipeline.AppliedPTransform
>  - apache_beam.pipeline.PipelineVisitor
>  - apache_beam.runners.common.DoFnRunner
>

The public API is the protos. You should not have to interact
with AppliedPTransform and PipelineVisitor directly (and while you can
reach in and do so, there are no promises here and these are subject to
change). As for DoFnRunner, if you're trying to reach in at this level
you're probably going to have to be replicating a bunch of surrounding
infrastructure as well. I would recommend using a BundleProcessor [1] to
coordinate the work (which will internally wire up the chain of DoFns
correctly and take them through their proper lifecycle). As mentioned
above, you can directly borrow the translations in fn_api_runner to go from
a full Pipeline graph (proto) to a set of fused DoFns to execute in
topological order (as ProcessBundleDescriptor protos, which is
what BundleProcessor accepts).

[1]
https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851


> Thanks again for the help,
> Joey
>
> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Another advantage of a portable runner would be that it will be using
>> well defined and backwards compatible Beam portable APIs to communicate
>> with SDKs. I think this is specially important for runners that do not live
>> in the Beam repo since otherwise future SDK releases could break your
>> runner in subtle ways. Also portability gives you more flexibility when it
>> comes to choosing an SDK to define the pipeline and will allow you to
>> execute transforms in any SDK via cross-language.
>>
>> Thanks,
>> Cham
>>
>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
>> user@beam.apache.org> wrote:
>>
>>>
>>>
>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <jo...@schrodinger.com>
>>> wrote:
>>>
>>>> Totally doable by one person, especially given the limited feature set
>>>>> you mention above.
>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE is
>>>>> a good starting point as to what the relationship between a Runner and the
>>>>> SDK is at a level of detail sufficient for implementation (told from the
>>>>> perspective of an SDK, but the story is largely about the interface which
>>>>> is directly applicable).
>>>>
>>>>
>>>> Great slides, I really appreciate the illustrations.
>>>>
>>>> I hadn't realized there was a concept of an "SDK Worker", I had
>>>> imagined that once the Runner started execution of a workflow, it was
>>>> Runner all the way down. Is the Fn API the only way to implement a runner?
>>>> Our execution environment is a bit constrained in such a way that we can't
>>>> expose the APIs required to implement the Fn API. To be forthright, we
>>>> basically only have the ability to start a worker either with a known
>>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or with
>>>> a bundle of data to process and return the outputs for. We're constrained
>>>> from really any additional communication with a worker beyond that.
>>>>
>>>
>>> The "worker" abstraction gives the ability to wrap any user code in a
>>> way that it can be called from any runner. If you're willing to constrain
>>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" can
>>> be a logical, rather than physical, concept.
>>>
>>> Another way to look at it is that in practice, the "runner" often has
>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical
>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be
>>> inlined (e.g. if it's 100% Python on both sides). See, for example,
>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>>>
>>>
>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <jo...@schrodinger.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks all for the responses!
>>>>>>
>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then,
>>>>>>> at fist, I’d suggest to answer two questions for yourself:
>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>
>>>>>>
>>>>>> Portable sounds great, but the answer depends on how much additional
>>>>>> cost it'd require to implement portable over non-portable, even considering
>>>>>> future deprecation (unless deprecation is happening tomorrow). I'm not
>>>>>> familiar enough to know what the additional cost is so I don't have a firm
>>>>>> answer.
>>>>>>
>>>>>
>>>>> I would way it would not be that expensive to write it in a "portable
>>>>> compatible" way (i.e it uses the publicly-documented protocol as the
>>>>> interface rather than reaching into internal details) even if it doesn't
>>>>> use GRCP and fire up separate processes/docker images for the workers
>>>>> (preferring to do tall of that inline like the Python portable direct
>>>>> runner does).
>>>>>
>>>>>
>>>>>> - Which SDK I should use for this runner?
>>>>>>>
>>>>>> I'd be developing this runner against the python SDK and if the
>>>>>> runner only worked with the python SDK that'd be okay in the short term
>>>>>>
>>>>>
>>>>> Yes. And if you do it the above way, it should be easy to extend (or
>>>>> not) if/when the need arises.
>>>>>
>>>>>
>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>
>>>>>> Likely won't be contributed back to Beam (not sure if it'd actually
>>>>>> be useful to a wide audience anyways).
>>>>>>
>>>>>> The context is we've been developing an in-house large-scale pipeline
>>>>>> framework that encapsulates both the programming model and the
>>>>>> runner/execution of data workflows. As it's grown, I keep finding myself
>>>>>> just reimplementing features and abstractions Beam has already implemented,
>>>>>> so I wanted to explore adopting Beam. Our execution environment is very
>>>>>> particular though and our workflows require it (due to the way we license
>>>>>> our software), so my plan was to try to create a very basic runner that
>>>>>> uses our execution environment. The runner could have very few features
>>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
>>>>>> introduce a shim for some of our internally implemented transforms and
>>>>>> assess from there.
>>>>>>
>>>>>> Not sure if this is a lofty goal or not, so happy to hear your
>>>>>> thoughts as to whether this seems reasonable and achievable without a large
>>>>>> concerted effort or even if the general idea makes any sense. (I recognize
>>>>>> that it might not be *easy*, but I don't have the resources to
>>>>>> dedicate more than myself to work on a PoC)
>>>>>>
>>>>>
>>>>> Totally doable by one person, especially given the limited feature set
>>>>> you mention above.
>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>> is a good starting point as to what the relationship between a Runner and
>>>>> the SDK is at a level of detail sufficient for implementation (told from
>>>>> the perspective of an SDK, but the story is largely about the interface
>>>>> which is directly applicable).
>>>>>
>>>>> Given the limited feature set you proposed, this is similar to the
>>>>> original Python portable runner which took a week or two to put together
>>>>> (granted a lot has been added since then), or the typescript direct runner
>>>>> (
>>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>>>>> ) which was done (in its basic form, no support for side inputs and such)
>>>>> in less than a week. Granted, as these are local runners, this illustrates
>>>>> only the Beam-side complexity of things (not the work of actually
>>>>> implementing a distributed shuffle, starting and assigning work to multiple
>>>>> workers, etc. but presumably that's the kind of thing your execution
>>>>> environment already takes care of.
>>>>>
>>>>> As for some more concrete pointers, you could probably leverage a lot
>>>>> of what's there by invoking create_stages
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>>>
>>>>> which will do optimization, fusion, etc. and then implementing your
>>>>> own version of run_stages
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>>>
>>>>> to execute these in topological order on your compute infrastructure.
>>>>> (If you're not doing streaming, this is much more straightforward than all
>>>>> the bundler scheduler stuff that currently exists in that code).
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>>>>> user@beam.apache.org> wrote:
>>>>>>>
>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>>
>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then,
>>>>>>>> at fist, I’d suggest to answer two questions for yourself:
>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>
>>>>>>>
>>>>>>> The answer to this should be portable, as non-portable ones will be
>>>>>>> deprecated.
>>>>>>>
>>>>>>>
>>>>>>> Well, actually this is a question that I don’t remember we discussed
>>>>>>> here in details before and had a common agreement.
>>>>>>>
>>>>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>>>>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>>>>>>> actually based on native Spark RDD runner and its translations. So, which
>>>>>>> part should be deprecated and what is a reason for that?
>>>>>>>
>>>>>>> Well, anyway I guess it’s off topic here.
>>>>>>>
>>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>> So I agree that more details on this would be useful.
>>>>>>>
>>>>>>> —
>>>>>>> Alexey
>>>>>>>
>>>>>>>
>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>
>>>>>>>
>>>>>>> The answer to the above question makes this one moot :).
>>>>>>>
>>>>>>> On a more serious note, could you tell us a bit more about the
>>>>>>> runner you're looking at implementing?
>>>>>>>
>>>>>>>
>>>>>>>> Then, depending on answers, I’d suggest to take as an example one
>>>>>>>> of the most similar Beam runners and use it as a more detailed source of
>>>>>>>> information along with Beam runner doc mentioned before.
>>>>>>>>
>>>>>>>> —
>>>>>>>> Alexey
>>>>>>>>
>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Beam community!
>>>>>>>>
>>>>>>>> I'm interested in trying to implement a runner with my company's
>>>>>>>> execution environment but I'm struggling to get started. I've read the docs
>>>>>>>> page
>>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>>>>>> on implementing a runner but it's quite high level. Anyone have any
>>>>>>>> concrete suggestions on getting started?
>>>>>>>>
>>>>>>>> I've started by cloning and running the hello world example
>>>>>>>> <https://github.com/apache/beam-starter-python>. I've then
>>>>>>>> subclassed `PipelineRunner
>>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>>>>>> to create my own custom runner but at this point I'm a bit stuck. My custom
>>>>>>>> runner just looks like
>>>>>>>>
>>>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>>>     def run_pipeline(self, pipeline,
>>>>>>>>                      options):
>>>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>>>
>>>>>>>> And when using it I get an error about not having implemented
>>>>>>>> "Impulse"
>>>>>>>>
>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object
>>>>>>>> at 0x135d9ff40>.
>>>>>>>>
>>>>>>>> Am I going about this the right way? Are there tests I can run my
>>>>>>>> custom runner against to validate it beyond just running the hello world
>>>>>>>> example? I'm finding myself just digging through the beam source to try to
>>>>>>>> piece together how a runner works and I'm struggling to get a foothold. Any
>>>>>>>> guidance would be greatly appreciated, especially if anyone has any
>>>>>>>> experience implementing their own python runner.
>>>>>>>>
>>>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>>>> Cheers,
>>>>>>>> Joey
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>

Re: Getting Started With Implementing a Runner

Posted by Joey Tran <jo...@schrodinger.com>.
Working on this on and off now and getting some pretty good traction.

One thing I'm a little worried about is all the classes that are marked
"internal use only". A lot of these seem either very useful or possibly
critical to writing a runner. How strictly should I interpret these private
implementation labels?

A few bits that I'm interested in using ordered by how surprised I was to
find that they're internal only.

 - apache_bean.pipeline.AppliedPTransform
 - apache_beam.pipeline.PipelineVisitor
 - apache_beam.runners.common.DoFnRunner

Thanks again for the help,
Joey

On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Another advantage of a portable runner would be that it will be using well
> defined and backwards compatible Beam portable APIs to communicate with
> SDKs. I think this is specially important for runners that do not live in
> the Beam repo since otherwise future SDK releases could break your runner
> in subtle ways. Also portability gives you more flexibility when it comes
> to choosing an SDK to define the pipeline and will allow you to execute
> transforms in any SDK via cross-language.
>
> Thanks,
> Cham
>
> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>>
>>
>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <jo...@schrodinger.com>
>> wrote:
>>
>>> Totally doable by one person, especially given the limited feature set
>>>> you mention above.
>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE is
>>>> a good starting point as to what the relationship between a Runner and the
>>>> SDK is at a level of detail sufficient for implementation (told from the
>>>> perspective of an SDK, but the story is largely about the interface which
>>>> is directly applicable).
>>>
>>>
>>> Great slides, I really appreciate the illustrations.
>>>
>>> I hadn't realized there was a concept of an "SDK Worker", I had imagined
>>> that once the Runner started execution of a workflow, it was Runner all the
>>> way down. Is the Fn API the only way to implement a runner? Our execution
>>> environment is a bit constrained in such a way that we can't expose the
>>> APIs required to implement the Fn API. To be forthright, we basically only
>>> have the ability to start a worker either with a known Pub/Sub topic to
>>> expect data from and a Pub/Sub topic to write to; or with a bundle of data
>>> to process and return the outputs for. We're constrained from really any
>>> additional communication with a worker beyond that.
>>>
>>
>> The "worker" abstraction gives the ability to wrap any user code in a way
>> that it can be called from any runner. If you're willing to constrain the
>> code you're wrapping (e.g. "Python DoFns only") then this "worker" can be a
>> logical, rather than physical, concept.
>>
>> Another way to look at it is that in practice, the "runner" often has its
>> own notion of "workers" which wrap (often in a 1:1 way) the logical "SDK
>> Worker" (which in turn invokes the actual DoFns). This latter may be
>> inlined (e.g. if it's 100% Python on both sides). See, for example,
>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>>
>>
>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <jo...@schrodinger.com>
>>>> wrote:
>>>>
>>>>> Thanks all for the responses!
>>>>>
>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>>>>> fist, I’d suggest to answer two questions for yourself:
>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>
>>>>>
>>>>> Portable sounds great, but the answer depends on how much additional
>>>>> cost it'd require to implement portable over non-portable, even considering
>>>>> future deprecation (unless deprecation is happening tomorrow). I'm not
>>>>> familiar enough to know what the additional cost is so I don't have a firm
>>>>> answer.
>>>>>
>>>>
>>>> I would way it would not be that expensive to write it in a "portable
>>>> compatible" way (i.e it uses the publicly-documented protocol as the
>>>> interface rather than reaching into internal details) even if it doesn't
>>>> use GRCP and fire up separate processes/docker images for the workers
>>>> (preferring to do tall of that inline like the Python portable direct
>>>> runner does).
>>>>
>>>>
>>>>> - Which SDK I should use for this runner?
>>>>>>
>>>>> I'd be developing this runner against the python SDK and if the runner
>>>>> only worked with the python SDK that'd be okay in the short term
>>>>>
>>>>
>>>> Yes. And if you do it the above way, it should be easy to extend (or
>>>> not) if/when the need arises.
>>>>
>>>>
>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>
>>>>> Likely won't be contributed back to Beam (not sure if it'd actually be
>>>>> useful to a wide audience anyways).
>>>>>
>>>>> The context is we've been developing an in-house large-scale pipeline
>>>>> framework that encapsulates both the programming model and the
>>>>> runner/execution of data workflows. As it's grown, I keep finding myself
>>>>> just reimplementing features and abstractions Beam has already implemented,
>>>>> so I wanted to explore adopting Beam. Our execution environment is very
>>>>> particular though and our workflows require it (due to the way we license
>>>>> our software), so my plan was to try to create a very basic runner that
>>>>> uses our execution environment. The runner could have very few features
>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
>>>>> introduce a shim for some of our internally implemented transforms and
>>>>> assess from there.
>>>>>
>>>>> Not sure if this is a lofty goal or not, so happy to hear your
>>>>> thoughts as to whether this seems reasonable and achievable without a large
>>>>> concerted effort or even if the general idea makes any sense. (I recognize
>>>>> that it might not be *easy*, but I don't have the resources to
>>>>> dedicate more than myself to work on a PoC)
>>>>>
>>>>
>>>> Totally doable by one person, especially given the limited feature set
>>>> you mention above.
>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>> is a good starting point as to what the relationship between a Runner and
>>>> the SDK is at a level of detail sufficient for implementation (told from
>>>> the perspective of an SDK, but the story is largely about the interface
>>>> which is directly applicable).
>>>>
>>>> Given the limited feature set you proposed, this is similar to the
>>>> original Python portable runner which took a week or two to put together
>>>> (granted a lot has been added since then), or the typescript direct runner
>>>> (
>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>>>> ) which was done (in its basic form, no support for side inputs and such)
>>>> in less than a week. Granted, as these are local runners, this illustrates
>>>> only the Beam-side complexity of things (not the work of actually
>>>> implementing a distributed shuffle, starting and assigning work to multiple
>>>> workers, etc. but presumably that's the kind of thing your execution
>>>> environment already takes care of.
>>>>
>>>> As for some more concrete pointers, you could probably leverage a lot
>>>> of what's there by invoking create_stages
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>>
>>>> which will do optimization, fusion, etc. and then implementing your own
>>>> version of run_stages
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>>
>>>> to execute these in topological order on your compute infrastructure.
>>>> (If you're not doing streaming, this is much more straightforward than all
>>>> the bundler scheduler stuff that currently exists in that code).
>>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>
>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then,
>>>>>>> at fist, I’d suggest to answer two questions for yourself:
>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>
>>>>>>
>>>>>> The answer to this should be portable, as non-portable ones will be
>>>>>> deprecated.
>>>>>>
>>>>>>
>>>>>> Well, actually this is a question that I don’t remember we discussed
>>>>>> here in details before and had a common agreement.
>>>>>>
>>>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>>>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>>>>>> actually based on native Spark RDD runner and its translations. So, which
>>>>>> part should be deprecated and what is a reason for that?
>>>>>>
>>>>>> Well, anyway I guess it’s off topic here.
>>>>>>
>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>> So I agree that more details on this would be useful.
>>>>>>
>>>>>> —
>>>>>> Alexey
>>>>>>
>>>>>>
>>>>>> - Which SDK I should use for this runner?
>>>>>>>
>>>>>>
>>>>>> The answer to the above question makes this one moot :).
>>>>>>
>>>>>> On a more serious note, could you tell us a bit more about the runner
>>>>>> you're looking at implementing?
>>>>>>
>>>>>>
>>>>>>> Then, depending on answers, I’d suggest to take as an example one of
>>>>>>> the most similar Beam runners and use it as a more detailed source of
>>>>>>> information along with Beam runner doc mentioned before.
>>>>>>>
>>>>>>> —
>>>>>>> Alexey
>>>>>>>
>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Beam community!
>>>>>>>
>>>>>>> I'm interested in trying to implement a runner with my company's
>>>>>>> execution environment but I'm struggling to get started. I've read the docs
>>>>>>> page
>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>>>>> on implementing a runner but it's quite high level. Anyone have any
>>>>>>> concrete suggestions on getting started?
>>>>>>>
>>>>>>> I've started by cloning and running the hello world example
>>>>>>> <https://github.com/apache/beam-starter-python>. I've then
>>>>>>> subclassed `PipelineRunner
>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>>>>> to create my own custom runner but at this point I'm a bit stuck. My custom
>>>>>>> runner just looks like
>>>>>>>
>>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>>     def run_pipeline(self, pipeline,
>>>>>>>                      options):
>>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>>
>>>>>>> And when using it I get an error about not having implemented
>>>>>>> "Impulse"
>>>>>>>
>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object
>>>>>>> at 0x135d9ff40>.
>>>>>>>
>>>>>>> Am I going about this the right way? Are there tests I can run my
>>>>>>> custom runner against to validate it beyond just running the hello world
>>>>>>> example? I'm finding myself just digging through the beam source to try to
>>>>>>> piece together how a runner works and I'm struggling to get a foothold. Any
>>>>>>> guidance would be greatly appreciated, especially if anyone has any
>>>>>>> experience implementing their own python runner.
>>>>>>>
>>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>>> Cheers,
>>>>>>> Joey
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>

Re: Getting Started With Implementing a Runner

Posted by Chamikara Jayalath via user <us...@beam.apache.org>.
Another advantage of a portable runner would be that it will be using well
defined and backwards compatible Beam portable APIs to communicate with
SDKs. I think this is specially important for runners that do not live in
the Beam repo since otherwise future SDK releases could break your runner
in subtle ways. Also portability gives you more flexibility when it comes
to choosing an SDK to define the pipeline and will allow you to execute
transforms in any SDK via cross-language.

Thanks,
Cham

On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:

>
>
> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <jo...@schrodinger.com>
> wrote:
>
>> Totally doable by one person, especially given the limited feature set
>>> you mention above.
>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE is
>>> a good starting point as to what the relationship between a Runner and the
>>> SDK is at a level of detail sufficient for implementation (told from the
>>> perspective of an SDK, but the story is largely about the interface which
>>> is directly applicable).
>>
>>
>> Great slides, I really appreciate the illustrations.
>>
>> I hadn't realized there was a concept of an "SDK Worker", I had imagined
>> that once the Runner started execution of a workflow, it was Runner all the
>> way down. Is the Fn API the only way to implement a runner? Our execution
>> environment is a bit constrained in such a way that we can't expose the
>> APIs required to implement the Fn API. To be forthright, we basically only
>> have the ability to start a worker either with a known Pub/Sub topic to
>> expect data from and a Pub/Sub topic to write to; or with a bundle of data
>> to process and return the outputs for. We're constrained from really any
>> additional communication with a worker beyond that.
>>
>
> The "worker" abstraction gives the ability to wrap any user code in a way
> that it can be called from any runner. If you're willing to constrain the
> code you're wrapping (e.g. "Python DoFns only") then this "worker" can be a
> logical, rather than physical, concept.
>
> Another way to look at it is that in practice, the "runner" often has its
> own notion of "workers" which wrap (often in a 1:1 way) the logical "SDK
> Worker" (which in turn invokes the actual DoFns). This latter may be
> inlined (e.g. if it's 100% Python on both sides). See, for example,
> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>
>
>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <jo...@schrodinger.com>
>>> wrote:
>>>
>>>> Thanks all for the responses!
>>>>
>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>>>> fist, I’d suggest to answer two questions for yourself:
>>>>> - Am I going to implement a portable runner or native one?
>>>>>
>>>>
>>>> Portable sounds great, but the answer depends on how much additional
>>>> cost it'd require to implement portable over non-portable, even considering
>>>> future deprecation (unless deprecation is happening tomorrow). I'm not
>>>> familiar enough to know what the additional cost is so I don't have a firm
>>>> answer.
>>>>
>>>
>>> I would way it would not be that expensive to write it in a "portable
>>> compatible" way (i.e it uses the publicly-documented protocol as the
>>> interface rather than reaching into internal details) even if it doesn't
>>> use GRCP and fire up separate processes/docker images for the workers
>>> (preferring to do tall of that inline like the Python portable direct
>>> runner does).
>>>
>>>
>>>> - Which SDK I should use for this runner?
>>>>>
>>>> I'd be developing this runner against the python SDK and if the runner
>>>> only worked with the python SDK that'd be okay in the short term
>>>>
>>>
>>> Yes. And if you do it the above way, it should be easy to extend (or
>>> not) if/when the need arises.
>>>
>>>
>>>> Also, we don’t know if this new runner will be contributed back to Beam,
>>>>> what is a runtime and what actually is a final goal of it.
>>>>
>>>> Likely won't be contributed back to Beam (not sure if it'd actually be
>>>> useful to a wide audience anyways).
>>>>
>>>> The context is we've been developing an in-house large-scale pipeline
>>>> framework that encapsulates both the programming model and the
>>>> runner/execution of data workflows. As it's grown, I keep finding myself
>>>> just reimplementing features and abstractions Beam has already implemented,
>>>> so I wanted to explore adopting Beam. Our execution environment is very
>>>> particular though and our workflows require it (due to the way we license
>>>> our software), so my plan was to try to create a very basic runner that
>>>> uses our execution environment. The runner could have very few features
>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
>>>> introduce a shim for some of our internally implemented transforms and
>>>> assess from there.
>>>>
>>>> Not sure if this is a lofty goal or not, so happy to hear your thoughts
>>>> as to whether this seems reasonable and achievable without a large
>>>> concerted effort or even if the general idea makes any sense. (I recognize
>>>> that it might not be *easy*, but I don't have the resources to
>>>> dedicate more than myself to work on a PoC)
>>>>
>>>
>>> Totally doable by one person, especially given the limited feature set
>>> you mention above.
>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>> is a good starting point as to what the relationship between a Runner and
>>> the SDK is at a level of detail sufficient for implementation (told from
>>> the perspective of an SDK, but the story is largely about the interface
>>> which is directly applicable).
>>>
>>> Given the limited feature set you proposed, this is similar to the
>>> original Python portable runner which took a week or two to put together
>>> (granted a lot has been added since then), or the typescript direct runner
>>> (
>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>>> ) which was done (in its basic form, no support for side inputs and such)
>>> in less than a week. Granted, as these are local runners, this illustrates
>>> only the Beam-side complexity of things (not the work of actually
>>> implementing a distributed shuffle, starting and assigning work to multiple
>>> workers, etc. but presumably that's the kind of thing your execution
>>> environment already takes care of.
>>>
>>> As for some more concrete pointers, you could probably leverage a lot of
>>> what's there by invoking create_stages
>>>
>>>
>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>
>>> which will do optimization, fusion, etc. and then implementing your own
>>> version of run_stages
>>>
>>>
>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>
>>> to execute these in topological order on your compute infrastructure.
>>> (If you're not doing streaming, this is much more straightforward than all
>>> the bundler scheduler stuff that currently exists in that code).
>>>
>>>
>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>
>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>>>>> fist, I’d suggest to answer two questions for yourself:
>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>
>>>>>
>>>>> The answer to this should be portable, as non-portable ones will be
>>>>> deprecated.
>>>>>
>>>>>
>>>>> Well, actually this is a question that I don’t remember we discussed
>>>>> here in details before and had a common agreement.
>>>>>
>>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>>>>> actually based on native Spark RDD runner and its translations. So, which
>>>>> part should be deprecated and what is a reason for that?
>>>>>
>>>>> Well, anyway I guess it’s off topic here.
>>>>>
>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>> So I agree that more details on this would be useful.
>>>>>
>>>>> —
>>>>> Alexey
>>>>>
>>>>>
>>>>> - Which SDK I should use for this runner?
>>>>>>
>>>>>
>>>>> The answer to the above question makes this one moot :).
>>>>>
>>>>> On a more serious note, could you tell us a bit more about the runner
>>>>> you're looking at implementing?
>>>>>
>>>>>
>>>>>> Then, depending on answers, I’d suggest to take as an example one of
>>>>>> the most similar Beam runners and use it as a more detailed source of
>>>>>> information along with Beam runner doc mentioned before.
>>>>>>
>>>>>> —
>>>>>> Alexey
>>>>>>
>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Beam community!
>>>>>>
>>>>>> I'm interested in trying to implement a runner with my company's
>>>>>> execution environment but I'm struggling to get started. I've read the docs
>>>>>> page
>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>>>> on implementing a runner but it's quite high level. Anyone have any
>>>>>> concrete suggestions on getting started?
>>>>>>
>>>>>> I've started by cloning and running the hello world example
>>>>>> <https://github.com/apache/beam-starter-python>. I've then
>>>>>> subclassed `PipelineRunner
>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>>>> to create my own custom runner but at this point I'm a bit stuck. My custom
>>>>>> runner just looks like
>>>>>>
>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>     def run_pipeline(self, pipeline,
>>>>>>                      options):
>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>
>>>>>> And when using it I get an error about not having implemented
>>>>>> "Impulse"
>>>>>>
>>>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object
>>>>>> at 0x135d9ff40>.
>>>>>>
>>>>>> Am I going about this the right way? Are there tests I can run my
>>>>>> custom runner against to validate it beyond just running the hello world
>>>>>> example? I'm finding myself just digging through the beam source to try to
>>>>>> piece together how a runner works and I'm struggling to get a foothold. Any
>>>>>> guidance would be greatly appreciated, especially if anyone has any
>>>>>> experience implementing their own python runner.
>>>>>>
>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>> Cheers,
>>>>>> Joey
>>>>>>
>>>>>>
>>>>>>
>>>>>

Re: Getting Started With Implementing a Runner

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <jo...@schrodinger.com> wrote:

> Totally doable by one person, especially given the limited feature set you
>> mention above.
>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE is
>> a good starting point as to what the relationship between a Runner and the
>> SDK is at a level of detail sufficient for implementation (told from the
>> perspective of an SDK, but the story is largely about the interface which
>> is directly applicable).
>
>
> Great slides, I really appreciate the illustrations.
>
> I hadn't realized there was a concept of an "SDK Worker", I had imagined
> that once the Runner started execution of a workflow, it was Runner all the
> way down. Is the Fn API the only way to implement a runner? Our execution
> environment is a bit constrained in such a way that we can't expose the
> APIs required to implement the Fn API. To be forthright, we basically only
> have the ability to start a worker either with a known Pub/Sub topic to
> expect data from and a Pub/Sub topic to write to; or with a bundle of data
> to process and return the outputs for. We're constrained from really any
> additional communication with a worker beyond that.
>

The "worker" abstraction gives the ability to wrap any user code in a way
that it can be called from any runner. If you're willing to constrain the
code you're wrapping (e.g. "Python DoFns only") then this "worker" can be a
logical, rather than physical, concept.

Another way to look at it is that in practice, the "runner" often has its
own notion of "workers" which wrap (often in a 1:1 way) the logical "SDK
Worker" (which in turn invokes the actual DoFns). This latter may be
inlined (e.g. if it's 100% Python on both sides). See, for example,
https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350


> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <jo...@schrodinger.com>
>> wrote:
>>
>>> Thanks all for the responses!
>>>
>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>>> fist, I’d suggest to answer two questions for yourself:
>>>> - Am I going to implement a portable runner or native one?
>>>>
>>>
>>> Portable sounds great, but the answer depends on how much additional
>>> cost it'd require to implement portable over non-portable, even considering
>>> future deprecation (unless deprecation is happening tomorrow). I'm not
>>> familiar enough to know what the additional cost is so I don't have a firm
>>> answer.
>>>
>>
>> I would way it would not be that expensive to write it in a "portable
>> compatible" way (i.e it uses the publicly-documented protocol as the
>> interface rather than reaching into internal details) even if it doesn't
>> use GRCP and fire up separate processes/docker images for the workers
>> (preferring to do tall of that inline like the Python portable direct
>> runner does).
>>
>>
>>> - Which SDK I should use for this runner?
>>>>
>>> I'd be developing this runner against the python SDK and if the runner
>>> only worked with the python SDK that'd be okay in the short term
>>>
>>
>> Yes. And if you do it the above way, it should be easy to extend (or not)
>> if/when the need arises.
>>
>>
>>> Also, we don’t know if this new runner will be contributed back to Beam,
>>>> what is a runtime and what actually is a final goal of it.
>>>
>>> Likely won't be contributed back to Beam (not sure if it'd actually be
>>> useful to a wide audience anyways).
>>>
>>> The context is we've been developing an in-house large-scale pipeline
>>> framework that encapsulates both the programming model and the
>>> runner/execution of data workflows. As it's grown, I keep finding myself
>>> just reimplementing features and abstractions Beam has already implemented,
>>> so I wanted to explore adopting Beam. Our execution environment is very
>>> particular though and our workflows require it (due to the way we license
>>> our software), so my plan was to try to create a very basic runner that
>>> uses our execution environment. The runner could have very few features
>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
>>> introduce a shim for some of our internally implemented transforms and
>>> assess from there.
>>>
>>> Not sure if this is a lofty goal or not, so happy to hear your thoughts
>>> as to whether this seems reasonable and achievable without a large
>>> concerted effort or even if the general idea makes any sense. (I recognize
>>> that it might not be *easy*, but I don't have the resources to dedicate
>>> more than myself to work on a PoC)
>>>
>>
>> Totally doable by one person, especially given the limited feature set
>> you mention above.
>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>> is a good starting point as to what the relationship between a Runner and
>> the SDK is at a level of detail sufficient for implementation (told from
>> the perspective of an SDK, but the story is largely about the interface
>> which is directly applicable).
>>
>> Given the limited feature set you proposed, this is similar to the
>> original Python portable runner which took a week or two to put together
>> (granted a lot has been added since then), or the typescript direct runner
>> (
>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>> ) which was done (in its basic form, no support for side inputs and such)
>> in less than a week. Granted, as these are local runners, this illustrates
>> only the Beam-side complexity of things (not the work of actually
>> implementing a distributed shuffle, starting and assigning work to multiple
>> workers, etc. but presumably that's the kind of thing your execution
>> environment already takes care of.
>>
>> As for some more concrete pointers, you could probably leverage a lot of
>> what's there by invoking create_stages
>>
>>
>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>
>> which will do optimization, fusion, etc. and then implementing your own
>> version of run_stages
>>
>>
>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>
>> to execute these in topological order on your compute infrastructure. (If
>> you're not doing streaming, this is much more straightforward than all the
>> bundler scheduler stuff that currently exists in that code).
>>
>>
>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>>>> fist, I’d suggest to answer two questions for yourself:
>>>>> - Am I going to implement a portable runner or native one?
>>>>>
>>>>
>>>> The answer to this should be portable, as non-portable ones will be
>>>> deprecated.
>>>>
>>>>
>>>> Well, actually this is a question that I don’t remember we discussed
>>>> here in details before and had a common agreement.
>>>>
>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>>>> actually based on native Spark RDD runner and its translations. So, which
>>>> part should be deprecated and what is a reason for that?
>>>>
>>>> Well, anyway I guess it’s off topic here.
>>>>
>>>> Also, we don’t know if this new runner will be contributed back to
>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>> So I agree that more details on this would be useful.
>>>>
>>>> —
>>>> Alexey
>>>>
>>>>
>>>> - Which SDK I should use for this runner?
>>>>>
>>>>
>>>> The answer to the above question makes this one moot :).
>>>>
>>>> On a more serious note, could you tell us a bit more about the runner
>>>> you're looking at implementing?
>>>>
>>>>
>>>>> Then, depending on answers, I’d suggest to take as an example one of
>>>>> the most similar Beam runners and use it as a more detailed source of
>>>>> information along with Beam runner doc mentioned before.
>>>>>
>>>>> —
>>>>> Alexey
>>>>>
>>>>> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com> wrote:
>>>>>
>>>>> Hi Beam community!
>>>>>
>>>>> I'm interested in trying to implement a runner with my company's
>>>>> execution environment but I'm struggling to get started. I've read the docs
>>>>> page
>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>>> on implementing a runner but it's quite high level. Anyone have any
>>>>> concrete suggestions on getting started?
>>>>>
>>>>> I've started by cloning and running the hello world example
>>>>> <https://github.com/apache/beam-starter-python>. I've then subclassed
>>>>> `PipelineRunner
>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>>> to create my own custom runner but at this point I'm a bit stuck. My custom
>>>>> runner just looks like
>>>>>
>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>     def run_pipeline(self, pipeline,
>>>>>                      options):
>>>>>         self.visit_transforms(pipeline, options)
>>>>>
>>>>> And when using it I get an error about not having implemented "Impulse"
>>>>>
>>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object
>>>>> at 0x135d9ff40>.
>>>>>
>>>>> Am I going about this the right way? Are there tests I can run my
>>>>> custom runner against to validate it beyond just running the hello world
>>>>> example? I'm finding myself just digging through the beam source to try to
>>>>> piece together how a runner works and I'm struggling to get a foothold. Any
>>>>> guidance would be greatly appreciated, especially if anyone has any
>>>>> experience implementing their own python runner.
>>>>>
>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>> Cheers,
>>>>> Joey
>>>>>
>>>>>
>>>>>
>>>>

Re: Getting Started With Implementing a Runner

Posted by Joey Tran <jo...@schrodinger.com>.
>
> Totally doable by one person, especially given the limited feature set you
> mention above.
> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE is
> a good starting point as to what the relationship between a Runner and the
> SDK is at a level of detail sufficient for implementation (told from the
> perspective of an SDK, but the story is largely about the interface which
> is directly applicable).


Great slides, I really appreciate the illustrations.

I hadn't realized there was a concept of an "SDK Worker", I had imagined
that once the Runner started execution of a workflow, it was Runner all the
way down. Is the Fn API the only way to implement a runner? Our execution
environment is a bit constrained in such a way that we can't expose the
APIs required to implement the Fn API. To be forthright, we basically only
have the ability to start a worker either with a known Pub/Sub topic to
expect data from and a Pub/Sub topic to write to; or with a bundle of data
to process and return the outputs for. We're constrained from really any
additional communication with a worker beyond that.

On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <ro...@google.com> wrote:

> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <jo...@schrodinger.com>
> wrote:
>
>> Thanks all for the responses!
>>
>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>> fist, I’d suggest to answer two questions for yourself:
>>> - Am I going to implement a portable runner or native one?
>>>
>>
>> Portable sounds great, but the answer depends on how much additional cost
>> it'd require to implement portable over non-portable, even considering
>> future deprecation (unless deprecation is happening tomorrow). I'm not
>> familiar enough to know what the additional cost is so I don't have a firm
>> answer.
>>
>
> I would way it would not be that expensive to write it in a "portable
> compatible" way (i.e it uses the publicly-documented protocol as the
> interface rather than reaching into internal details) even if it doesn't
> use GRCP and fire up separate processes/docker images for the workers
> (preferring to do tall of that inline like the Python portable direct
> runner does).
>
>
>> - Which SDK I should use for this runner?
>>>
>> I'd be developing this runner against the python SDK and if the runner
>> only worked with the python SDK that'd be okay in the short term
>>
>
> Yes. And if you do it the above way, it should be easy to extend (or not)
> if/when the need arises.
>
>
>> Also, we don’t know if this new runner will be contributed back to Beam,
>>> what is a runtime and what actually is a final goal of it.
>>
>> Likely won't be contributed back to Beam (not sure if it'd actually be
>> useful to a wide audience anyways).
>>
>> The context is we've been developing an in-house large-scale pipeline
>> framework that encapsulates both the programming model and the
>> runner/execution of data workflows. As it's grown, I keep finding myself
>> just reimplementing features and abstractions Beam has already implemented,
>> so I wanted to explore adopting Beam. Our execution environment is very
>> particular though and our workflows require it (due to the way we license
>> our software), so my plan was to try to create a very basic runner that
>> uses our execution environment. The runner could have very few features
>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
>> introduce a shim for some of our internally implemented transforms and
>> assess from there.
>>
>> Not sure if this is a lofty goal or not, so happy to hear your thoughts
>> as to whether this seems reasonable and achievable without a large
>> concerted effort or even if the general idea makes any sense. (I recognize
>> that it might not be *easy*, but I don't have the resources to dedicate
>> more than myself to work on a PoC)
>>
>
> Totally doable by one person, especially given the limited feature set you
> mention above.
> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
> is a good starting point as to what the relationship between a Runner and
> the SDK is at a level of detail sufficient for implementation (told from
> the perspective of an SDK, but the story is largely about the interface
> which is directly applicable).
>
> Given the limited feature set you proposed, this is similar to the
> original Python portable runner which took a week or two to put together
> (granted a lot has been added since then), or the typescript direct runner
> (
> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
> ) which was done (in its basic form, no support for side inputs and such)
> in less than a week. Granted, as these are local runners, this illustrates
> only the Beam-side complexity of things (not the work of actually
> implementing a distributed shuffle, starting and assigning work to multiple
> workers, etc. but presumably that's the kind of thing your execution
> environment already takes care of.
>
> As for some more concrete pointers, you could probably leverage a lot of
> what's there by invoking create_stages
>
>
> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>
> which will do optimization, fusion, etc. and then implementing your own
> version of run_stages
>
>
> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>
> to execute these in topological order on your compute infrastructure. (If
> you're not doing streaming, this is much more straightforward than all the
> bundler scheduler stuff that currently exists in that code).
>
>
>
>>
>>
>>
>>
>>
>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>>
>>>
>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <us...@beam.apache.org>
>>> wrote:
>>>
>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <ar...@gmail.com>
>>> wrote:
>>>
>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>>> fist, I’d suggest to answer two questions for yourself:
>>>> - Am I going to implement a portable runner or native one?
>>>>
>>>
>>> The answer to this should be portable, as non-portable ones will be
>>> deprecated.
>>>
>>>
>>> Well, actually this is a question that I don’t remember we discussed
>>> here in details before and had a common agreement.
>>>
>>> Actually, I’m not sure that I understand clearly what is meant by
>>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>>> actually based on native Spark RDD runner and its translations. So, which
>>> part should be deprecated and what is a reason for that?
>>>
>>> Well, anyway I guess it’s off topic here.
>>>
>>> Also, we don’t know if this new runner will be contributed back to Beam,
>>> what is a runtime and what actually is a final goal of it.
>>> So I agree that more details on this would be useful.
>>>
>>> —
>>> Alexey
>>>
>>>
>>> - Which SDK I should use for this runner?
>>>>
>>>
>>> The answer to the above question makes this one moot :).
>>>
>>> On a more serious note, could you tell us a bit more about the runner
>>> you're looking at implementing?
>>>
>>>
>>>> Then, depending on answers, I’d suggest to take as an example one of
>>>> the most similar Beam runners and use it as a more detailed source of
>>>> information along with Beam runner doc mentioned before.
>>>>
>>>> —
>>>> Alexey
>>>>
>>>> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com> wrote:
>>>>
>>>> Hi Beam community!
>>>>
>>>> I'm interested in trying to implement a runner with my company's
>>>> execution environment but I'm struggling to get started. I've read the docs
>>>> page
>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>> on implementing a runner but it's quite high level. Anyone have any
>>>> concrete suggestions on getting started?
>>>>
>>>> I've started by cloning and running the hello world example
>>>> <https://github.com/apache/beam-starter-python>. I've then subclassed `
>>>> PipelineRunner
>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>> to create my own custom runner but at this point I'm a bit stuck. My custom
>>>> runner just looks like
>>>>
>>>> class CustomRunner(runner.PipelineRunner):
>>>>     def run_pipeline(self, pipeline,
>>>>                      options):
>>>>         self.visit_transforms(pipeline, options)
>>>>
>>>> And when using it I get an error about not having implemented "Impulse"
>>>>
>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object
>>>> at 0x135d9ff40>.
>>>>
>>>> Am I going about this the right way? Are there tests I can run my
>>>> custom runner against to validate it beyond just running the hello world
>>>> example? I'm finding myself just digging through the beam source to try to
>>>> piece together how a runner works and I'm struggling to get a foothold. Any
>>>> guidance would be greatly appreciated, especially if anyone has any
>>>> experience implementing their own python runner.
>>>>
>>>> Thanks in advance! Also, could I get a Slack invite?
>>>> Cheers,
>>>> Joey
>>>>
>>>>
>>>>
>>>

Re: Getting Started With Implementing a Runner

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <jo...@schrodinger.com>
wrote:

> Thanks all for the responses!
>
> If Beam Runner Authoring Guide is rather high-level for you, then, at
>> fist, I’d suggest to answer two questions for yourself:
>> - Am I going to implement a portable runner or native one?
>>
>
> Portable sounds great, but the answer depends on how much additional cost
> it'd require to implement portable over non-portable, even considering
> future deprecation (unless deprecation is happening tomorrow). I'm not
> familiar enough to know what the additional cost is so I don't have a firm
> answer.
>

I would way it would not be that expensive to write it in a "portable
compatible" way (i.e it uses the publicly-documented protocol as the
interface rather than reaching into internal details) even if it doesn't
use GRCP and fire up separate processes/docker images for the workers
(preferring to do tall of that inline like the Python portable direct
runner does).


> - Which SDK I should use for this runner?
>>
> I'd be developing this runner against the python SDK and if the runner
> only worked with the python SDK that'd be okay in the short term
>

Yes. And if you do it the above way, it should be easy to extend (or not)
if/when the need arises.


> Also, we don’t know if this new runner will be contributed back to Beam,
>> what is a runtime and what actually is a final goal of it.
>
> Likely won't be contributed back to Beam (not sure if it'd actually be
> useful to a wide audience anyways).
>
> The context is we've been developing an in-house large-scale pipeline
> framework that encapsulates both the programming model and the
> runner/execution of data workflows. As it's grown, I keep finding myself
> just reimplementing features and abstractions Beam has already implemented,
> so I wanted to explore adopting Beam. Our execution environment is very
> particular though and our workflows require it (due to the way we license
> our software), so my plan was to try to create a very basic runner that
> uses our execution environment. The runner could have very few features
> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
> introduce a shim for some of our internally implemented transforms and
> assess from there.
>
> Not sure if this is a lofty goal or not, so happy to hear your thoughts as
> to whether this seems reasonable and achievable without a large concerted
> effort or even if the general idea makes any sense. (I recognize that it
> might not be *easy*, but I don't have the resources to dedicate more than
> myself to work on a PoC)
>

Totally doable by one person, especially given the limited feature set you
mention above.
https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
is a good starting point as to what the relationship between a Runner and
the SDK is at a level of detail sufficient for implementation (told from
the perspective of an SDK, but the story is largely about the interface
which is directly applicable).

Given the limited feature set you proposed, this is similar to the original
Python portable runner which took a week or two to put together (granted a
lot has been added since then), or the typescript direct runner (
https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
) which was done (in its basic form, no support for side inputs and such)
in less than a week. Granted, as these are local runners, this illustrates
only the Beam-side complexity of things (not the work of actually
implementing a distributed shuffle, starting and assigning work to multiple
workers, etc. but presumably that's the kind of thing your execution
environment already takes care of.

As for some more concrete pointers, you could probably leverage a lot of
what's there by invoking create_stages

https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362

which will do optimization, fusion, etc. and then implementing your own
version of run_stages

https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392

to execute these in topological order on your compute infrastructure. (If
you're not doing streaming, this is much more straightforward than all the
bundler scheduler stuff that currently exists in that code).



>
>
>
>
>
> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
>
>>
>>
>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <us...@beam.apache.org>
>> wrote:
>>
>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <ar...@gmail.com>
>> wrote:
>>
>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>> fist, I’d suggest to answer two questions for yourself:
>>> - Am I going to implement a portable runner or native one?
>>>
>>
>> The answer to this should be portable, as non-portable ones will be
>> deprecated.
>>
>>
>> Well, actually this is a question that I don’t remember we discussed here
>> in details before and had a common agreement.
>>
>> Actually, I’m not sure that I understand clearly what is meant by
>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>> actually based on native Spark RDD runner and its translations. So, which
>> part should be deprecated and what is a reason for that?
>>
>> Well, anyway I guess it’s off topic here.
>>
>> Also, we don’t know if this new runner will be contributed back to Beam,
>> what is a runtime and what actually is a final goal of it.
>> So I agree that more details on this would be useful.
>>
>> —
>> Alexey
>>
>>
>> - Which SDK I should use for this runner?
>>>
>>
>> The answer to the above question makes this one moot :).
>>
>> On a more serious note, could you tell us a bit more about the runner
>> you're looking at implementing?
>>
>>
>>> Then, depending on answers, I’d suggest to take as an example one of the
>>> most similar Beam runners and use it as a more detailed source of
>>> information along with Beam runner doc mentioned before.
>>>
>>> —
>>> Alexey
>>>
>>> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com> wrote:
>>>
>>> Hi Beam community!
>>>
>>> I'm interested in trying to implement a runner with my company's
>>> execution environment but I'm struggling to get started. I've read the docs
>>> page
>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>> on implementing a runner but it's quite high level. Anyone have any
>>> concrete suggestions on getting started?
>>>
>>> I've started by cloning and running the hello world example
>>> <https://github.com/apache/beam-starter-python>. I've then subclassed `
>>> PipelineRunner
>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>> to create my own custom runner but at this point I'm a bit stuck. My custom
>>> runner just looks like
>>>
>>> class CustomRunner(runner.PipelineRunner):
>>>     def run_pipeline(self, pipeline,
>>>                      options):
>>>         self.visit_transforms(pipeline, options)
>>>
>>> And when using it I get an error about not having implemented "Impulse"
>>>
>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object
>>> at 0x135d9ff40>.
>>>
>>> Am I going about this the right way? Are there tests I can run my custom
>>> runner against to validate it beyond just running the hello world example?
>>> I'm finding myself just digging through the beam source to try to piece
>>> together how a runner works and I'm struggling to get a foothold. Any
>>> guidance would be greatly appreciated, especially if anyone has any
>>> experience implementing their own python runner.
>>>
>>> Thanks in advance! Also, could I get a Slack invite?
>>> Cheers,
>>> Joey
>>>
>>>
>>>
>>

Re: Getting Started With Implementing a Runner

Posted by Joey Tran <jo...@schrodinger.com>.
Thanks all for the responses!

If Beam Runner Authoring Guide is rather high-level for you, then, at fist,
> I’d suggest to answer two questions for yourself:
> - Am I going to implement a portable runner or native one?
>

Portable sounds great, but the answer depends on how much additional cost
it'd require to implement portable over non-portable, even considering
future deprecation (unless deprecation is happening tomorrow). I'm not
familiar enough to know what the additional cost is so I don't have a firm
answer.

- Which SDK I should use for this runner?
>
I'd be developing this runner against the python SDK and if the runner only
worked with the python SDK that'd be okay in the short term

Also, we don’t know if this new runner will be contributed back to Beam,
> what is a runtime and what actually is a final goal of it.

Likely won't be contributed back to Beam (not sure if it'd actually be
useful to a wide audience anyways).

The context is we've been developing an in-house large-scale pipeline
framework that encapsulates both the programming model and the
runner/execution of data workflows. As it's grown, I keep finding myself
just reimplementing features and abstractions Beam has already implemented,
so I wanted to explore adopting Beam. Our execution environment is very
particular though and our workflows require it (due to the way we license
our software), so my plan was to try to create a very basic runner that
uses our execution environment. The runner could have very few features
e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
introduce a shim for some of our internally implemented transforms and
assess from there.

Not sure if this is a lofty goal or not, so happy to hear your thoughts as
to whether this seems reasonable and achievable without a large concerted
effort or even if the general idea makes any sense. (I recognize that it
might not be *easy*, but I don't have the resources to dedicate more than
myself to work on a PoC)





On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <ar...@gmail.com>
wrote:

>
>
> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <us...@beam.apache.org>
> wrote:
>
> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>> fist, I’d suggest to answer two questions for yourself:
>> - Am I going to implement a portable runner or native one?
>>
>
> The answer to this should be portable, as non-portable ones will be
> deprecated.
>
>
> Well, actually this is a question that I don’t remember we discussed here
> in details before and had a common agreement.
>
> Actually, I’m not sure that I understand clearly what is meant by
> “deprecation" in this case. For example, Portable Spark Runner is heavily
> actually based on native Spark RDD runner and its translations. So, which
> part should be deprecated and what is a reason for that?
>
> Well, anyway I guess it’s off topic here.
>
> Also, we don’t know if this new runner will be contributed back to Beam,
> what is a runtime and what actually is a final goal of it.
> So I agree that more details on this would be useful.
>
> —
> Alexey
>
>
> - Which SDK I should use for this runner?
>>
>
> The answer to the above question makes this one moot :).
>
> On a more serious note, could you tell us a bit more about the runner
> you're looking at implementing?
>
>
>> Then, depending on answers, I’d suggest to take as an example one of the
>> most similar Beam runners and use it as a more detailed source of
>> information along with Beam runner doc mentioned before.
>>
>> —
>> Alexey
>>
>> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com> wrote:
>>
>> Hi Beam community!
>>
>> I'm interested in trying to implement a runner with my company's
>> execution environment but I'm struggling to get started. I've read the docs
>> page
>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>> on implementing a runner but it's quite high level. Anyone have any
>> concrete suggestions on getting started?
>>
>> I've started by cloning and running the hello world example
>> <https://github.com/apache/beam-starter-python>. I've then subclassed `
>> PipelineRunner
>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>> to create my own custom runner but at this point I'm a bit stuck. My custom
>> runner just looks like
>>
>> class CustomRunner(runner.PipelineRunner):
>>     def run_pipeline(self, pipeline,
>>                      options):
>>         self.visit_transforms(pipeline, options)
>>
>> And when using it I get an error about not having implemented "Impulse"
>>
>> NotImplementedError: Execution of [<Impulse(PTransform) label=[Impulse]>]
>> not implemented in runner <my_app.app.CustomRunner object at 0x135d9ff40>.
>>
>> Am I going about this the right way? Are there tests I can run my custom
>> runner against to validate it beyond just running the hello world example?
>> I'm finding myself just digging through the beam source to try to piece
>> together how a runner works and I'm struggling to get a foothold. Any
>> guidance would be greatly appreciated, especially if anyone has any
>> experience implementing their own python runner.
>>
>> Thanks in advance! Also, could I get a Slack invite?
>> Cheers,
>> Joey
>>
>>
>>
>

Re: Getting Started With Implementing a Runner

Posted by Alexey Romanenko <ar...@gmail.com>.

> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <us...@beam.apache.org> wrote:
> 
> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> If Beam Runner Authoring Guide is rather high-level for you, then, at fist, I’d suggest to answer two questions for yourself:
>> - Am I going to implement a portable runner or native one?
> 
> 
> The answer to this should be portable, as non-portable ones will be deprecated.

Well, actually this is a question that I don’t remember we discussed here in details before and had a common agreement. 

Actually, I’m not sure that I understand clearly what is meant by “deprecation" in this case. For example, Portable Spark Runner is heavily actually based on native Spark RDD runner and its translations. So, which part should be deprecated and what is a reason for that?

Well, anyway I guess it’s off topic here.

Also, we don’t know if this new runner will be contributed back to Beam, what is a runtime and what actually is a final goal of it. 
So I agree that more details on this would be useful.

—
Alexey

> 
>> - Which SDK I should use for this runner?
> 
> 
> The answer to the above question makes this one moot :).
> 
> On a more serious note, could you tell us a bit more about the runner you're looking at implementing?
> 
>> 
>> Then, depending on answers, I’d suggest to take as an example one of the most similar Beam runners and use it as a more detailed source of information along with Beam runner doc mentioned before.
>> 
>> —
>> Alexey
>> 
>>> On 22 Jun 2023, at 14:39, Joey Tran <joey.tran@schrodinger.com <ma...@schrodinger.com>> wrote:
>>> 
>>> Hi Beam community!
>>> 
>>> I'm interested in trying to implement a runner with my company's execution environment but I'm struggling to get started. I've read the docs page <https://beam.apache.org/contribute/runner-guide/#testing-your-runner> on implementing a runner but it's quite high level. Anyone have any concrete suggestions on getting started?
>>> 
>>> I've started by cloning and running the hello world example <https://github.com/apache/beam-starter-python>. I've then subclassed `PipelineRunner <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>` to create my own custom runner but at this point I'm a bit stuck. My custom runner just looks like
>>> 
>>> class CustomRunner(runner.PipelineRunner):
>>>     def run_pipeline(self, pipeline,
>>>                      options):
>>>         self.visit_transforms(pipeline, options)
>>> 
>>> And when using it I get an error about not having implemented "Impulse"
>>> 
>>> NotImplementedError: Execution of [<Impulse(PTransform) label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object at 0x135d9ff40>.
>>> 
>>> Am I going about this the right way? Are there tests I can run my custom runner against to validate it beyond just running the hello world example? I'm finding myself just digging through the beam source to try to piece together how a runner works and I'm struggling to get a foothold. Any guidance would be greatly appreciated, especially if anyone has any experience implementing their own python runner.
>>> 
>>> Thanks in advance! Also, could I get a Slack invite?
>>> Cheers,
>>> Joey
>> 


Re: Getting Started With Implementing a Runner

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> If Beam Runner Authoring Guide is rather high-level for you, then, at
> fist, I’d suggest to answer two questions for yourself:
> - Am I going to implement a portable runner or native one?
>

The answer to this should be portable, as non-portable ones will be
deprecated.

- Which SDK I should use for this runner?
>

The answer to the above question makes this one moot :).

On a more serious note, could you tell us a bit more about the runner
you're looking at implementing?


> Then, depending on answers, I’d suggest to take as an example one of the
> most similar Beam runners and use it as a more detailed source of
> information along with Beam runner doc mentioned before.
>
> —
> Alexey
>
> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com> wrote:
>
> Hi Beam community!
>
> I'm interested in trying to implement a runner with my company's execution
> environment but I'm struggling to get started. I've read the docs page
> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner> on
> implementing a runner but it's quite high level. Anyone have any concrete
> suggestions on getting started?
>
> I've started by cloning and running the hello world example
> <https://github.com/apache/beam-starter-python>. I've then subclassed `
> PipelineRunner
> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
> to create my own custom runner but at this point I'm a bit stuck. My custom
> runner just looks like
>
> class CustomRunner(runner.PipelineRunner):
>     def run_pipeline(self, pipeline,
>                      options):
>         self.visit_transforms(pipeline, options)
>
> And when using it I get an error about not having implemented "Impulse"
>
> NotImplementedError: Execution of [<Impulse(PTransform) label=[Impulse]>]
> not implemented in runner <my_app.app.CustomRunner object at 0x135d9ff40>.
>
> Am I going about this the right way? Are there tests I can run my custom
> runner against to validate it beyond just running the hello world example?
> I'm finding myself just digging through the beam source to try to piece
> together how a runner works and I'm struggling to get a foothold. Any
> guidance would be greatly appreciated, especially if anyone has any
> experience implementing their own python runner.
>
> Thanks in advance! Also, could I get a Slack invite?
> Cheers,
> Joey
>
>
>

Re: Getting Started With Implementing a Runner

Posted by Alexey Romanenko <ar...@gmail.com>.
If Beam Runner Authoring Guide is rather high-level for you, then, at fist, I’d suggest to answer two questions for yourself:
- Am I going to implement a portable runner or native one?
- Which SDK I should use for this runner?

Then, depending on answers, I’d suggest to take as an example one of the most similar Beam runners and use it as a more detailed source of information along with Beam runner doc mentioned before.

—
Alexey

> On 22 Jun 2023, at 14:39, Joey Tran <jo...@schrodinger.com> wrote:
> 
> Hi Beam community!
> 
> I'm interested in trying to implement a runner with my company's execution environment but I'm struggling to get started. I've read the docs page <https://beam.apache.org/contribute/runner-guide/#testing-your-runner> on implementing a runner but it's quite high level. Anyone have any concrete suggestions on getting started?
> 
> I've started by cloning and running the hello world example <https://github.com/apache/beam-starter-python>. I've then subclassed `PipelineRunner <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>` to create my own custom runner but at this point I'm a bit stuck. My custom runner just looks like
> 
> class CustomRunner(runner.PipelineRunner):
>     def run_pipeline(self, pipeline,
>                      options):
>         self.visit_transforms(pipeline, options)
> 
> And when using it I get an error about not having implemented "Impulse"
> 
> NotImplementedError: Execution of [<Impulse(PTransform) label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner object at 0x135d9ff40>.
> 
> Am I going about this the right way? Are there tests I can run my custom runner against to validate it beyond just running the hello world example? I'm finding myself just digging through the beam source to try to piece together how a runner works and I'm struggling to get a foothold. Any guidance would be greatly appreciated, especially if anyone has any experience implementing their own python runner.
> 
> Thanks in advance! Also, could I get a Slack invite?
> Cheers,
> Joey


Re: Getting Started With Implementing a Runner

Posted by Joey Tran <jo...@schrodinger.com>.
Thanks Jack!

I've tried that Slack link but it requires an account with a @apache email


On Thu, Jun 22, 2023 at 10:08 AM Jack McCluskey via user <
user@beam.apache.org> wrote:

> Hey Joey,
>
> The best resource to look at, at the moment, is likely Robert Burke's
> Prism runner that he is implementing (
> https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism).
> Runners are pretty complicated and there are a number of primitives that
> you need to have implemented on the runner side to get executing pipelines.
>
> The Slack has a link on the Beam Contact Us page
> <https://beam.apache.org/community/contact-us/>, and I'd highly recommend
> routing questions towards the developer mailing (dev@beam.apache.org)
> list rather than the user one for runner implementation things.
>
> Thanks,
>
> Jack McCluskey
>
> On Thu, Jun 22, 2023 at 8:40 AM Joey Tran <jo...@schrodinger.com>
> wrote:
>
>> Hi Beam community!
>>
>> I'm interested in trying to implement a runner with my company's
>> execution environment but I'm struggling to get started. I've read the docs
>> page
>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>> on implementing a runner but it's quite high level. Anyone have any
>> concrete suggestions on getting started?
>>
>> I've started by cloning and running the hello world example
>> <https://github.com/apache/beam-starter-python>. I've then subclassed `
>> PipelineRunner
>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>> to create my own custom runner but at this point I'm a bit stuck. My custom
>> runner just looks like
>>
>> class CustomRunner(runner.PipelineRunner):
>>     def run_pipeline(self, pipeline,
>>                      options):
>>         self.visit_transforms(pipeline, options)
>>
>> And when using it I get an error about not having implemented "Impulse"
>>
>> NotImplementedError: Execution of [<Impulse(PTransform) label=[Impulse]>]
>> not implemented in runner <my_app.app.CustomRunner object at 0x135d9ff40>.
>>
>> Am I going about this the right way? Are there tests I can run my custom
>> runner against to validate it beyond just running the hello world example?
>> I'm finding myself just digging through the beam source to try to piece
>> together how a runner works and I'm struggling to get a foothold. Any
>> guidance would be greatly appreciated, especially if anyone has any
>> experience implementing their own python runner.
>>
>> Thanks in advance! Also, could I get a Slack invite?
>> Cheers,
>> Joey
>>
>

Re: Getting Started With Implementing a Runner

Posted by Jack McCluskey via user <us...@beam.apache.org>.
Hey Joey,

The best resource to look at, at the moment, is likely Robert Burke's Prism
runner that he is implementing (
https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism).
Runners are pretty complicated and there are a number of primitives that
you need to have implemented on the runner side to get executing pipelines.

The Slack has a link on the Beam Contact Us page
<https://beam.apache.org/community/contact-us/>, and I'd highly recommend
routing questions towards the developer mailing (dev@beam.apache.org) list
rather than the user one for runner implementation things.

Thanks,

Jack McCluskey

On Thu, Jun 22, 2023 at 8:40 AM Joey Tran <jo...@schrodinger.com> wrote:

> Hi Beam community!
>
> I'm interested in trying to implement a runner with my company's execution
> environment but I'm struggling to get started. I've read the docs page
> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner> on
> implementing a runner but it's quite high level. Anyone have any concrete
> suggestions on getting started?
>
> I've started by cloning and running the hello world example
> <https://github.com/apache/beam-starter-python>. I've then subclassed `
> PipelineRunner
> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
> to create my own custom runner but at this point I'm a bit stuck. My custom
> runner just looks like
>
> class CustomRunner(runner.PipelineRunner):
>     def run_pipeline(self, pipeline,
>                      options):
>         self.visit_transforms(pipeline, options)
>
> And when using it I get an error about not having implemented "Impulse"
>
> NotImplementedError: Execution of [<Impulse(PTransform) label=[Impulse]>]
> not implemented in runner <my_app.app.CustomRunner object at 0x135d9ff40>.
>
> Am I going about this the right way? Are there tests I can run my custom
> runner against to validate it beyond just running the hello world example?
> I'm finding myself just digging through the beam source to try to piece
> together how a runner works and I'm struggling to get a foothold. Any
> guidance would be greatly appreciated, especially if anyone has any
> experience implementing their own python runner.
>
> Thanks in advance! Also, could I get a Slack invite?
> Cheers,
> Joey
>