You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "moradology (via GitHub)" <gi...@apache.org> on 2024/03/19 16:41:52 UTC

[I] Seeking guidance on new runner implementation [beam]

moradology opened a new issue, #30675:
URL: https://github.com/apache/beam/issues/30675

   OK, so I'm very interested in making the batch subset of apache beam play nicely with EMR-Serverless. Unfortunately, this is difficult to pull off with the portable runner - perhaps impossible even - as there is an assumption so far as I can tell that the spark master UI be available to take work from the beam's job runner. To that end, I've begun adapting roughly the strategy found in the dask runner in the python SDK to build up pyspark RDDs that are submitted directly via whatever `SparkSession` pyspark finds at runtime. So far, so good. I even have a (partial) implementation of support for side inputs!
   
   Unfortunately, here, I am running into some difficulties and would love to get some feedback on whatever it is that I might be missing. As runner authors will surely be aware, it is necessary to distinguish between `AsIter` and `AsSingleton` `AsSideInput` instances. Fair enough, but by the time I am traversing `AppliedPTransform` instances to evaluate,  that information appears to be gone. Perhaps lost in some of the serialization/deserialization that occurs during `Transform` application!
   
   Here's what I'm seeing when I print out some context about a given `AppliedPTransform` [at this point in the runner](https://github.com/moradology/beam-pyspark-runner/blob/real_traversal_infrastructure/beam_pyspark_runner/pyspark_runner.py#L125) (so far, I've only run some visitors over the AST to collect some context that I use later in planning out execution):
   ```
    'write test/Write/WriteImpl/WriteBundles': {'input_producer_labels': ['write '
                                                                          'test/Write/WriteImpl/WindowInto(WindowIntoFn)'],
                                                'input_producers': [AppliedPTransform(write test/Write/WriteImpl/WindowInto(WindowIntoFn), WindowInto)],
                                                'inputs': (<PCollection[write test/Write/WriteImpl/WindowInto(WindowIntoFn).None] at 0x123cbfd90>,),
                                                'outputs': dict_values([<PCollection[write test/Write/WriteImpl/WriteBundles.None] at 0x123c6a2d0>]),
                                                'parent': 'write '
                                                          'test/Write/WriteImpl',
                                                'side_inputs': (<apache_beam.pvalue._UnpickledSideInput object at 0x123cadad0>,),
                                                'type': 'ParDo',
                                                'xform_side_inputs': [<apache_beam.pvalue._UnpickledSideInput object at 0x123cadad0>]}}
   ```
   
   Note that I have an `_UnpickledSideInput`. This type does not include the `AsIter` and `AsSingleton` context that appears to be absolutely necessary to decide how results of a side-input should be passed to a consumer (whether the whole list or else just its head).
   
   What am I missing here? If I drop a debugger in beam's source for `core.ParDo`, I can see this information. It just appears to be lost later on. Example: `<ParDo(PTransform) label=[ParDo(_WriteBundleDoFn)] side_inputs=[AsSingleton(PCollection[write test/Write/WriteImpl/InitializeWrite.None])] at 0x12d7c8410>`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Seeking guidance on new runner implementation [beam]

Posted by "moradology (via GitHub)" <gi...@apache.org>.
moradology commented on issue #30675:
URL: https://github.com/apache/beam/issues/30675#issuecomment-2009727938

   @cisaacstern I've made some progress on this. It turns out that in the context of running a `TestPipeline` (at least via pytest) side inputs get pickled and unpickled. Running the same exact pipeline yields very different information with a normal pipeline.
   In tests: `<apache_beam.pvalue._UnpickledSideInput object at 0x13276c590>`
   In normal pipeline: `AsSingleton(PCollection[write test/Write/WriteImpl/InitializeWrite.None])`
   
   The `_UnpickledSideInput` appears to include a `view_fn` on its associated data that allows one to work around the loss of information in tests. Still, I'm curious about whether this behavior (the tests potentially changing things) is what library authors had in mind


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Seeking guidance on new runner implementation [beam]

Posted by "cisaacstern (via GitHub)" <gi...@apache.org>.
cisaacstern commented on issue #30675:
URL: https://github.com/apache/beam/issues/30675#issuecomment-2008380280

   👋 @moradology! Over in my WIP for DaskRunner SideInputs, I've got this:
   
   https://github.com/apache/beam/pull/27618/files#diff-bfb5ae715e9067778f492058e8a02ff877d6e7584624908ddbdd316853e6befbR173-R182
   
   (Which is heavily modeled on the RayRunner.)
   
   I actually don't remember off the top of my head what part of that surfaces the `As*` typing, but the tests pass over there so I think it's being captured somehow! With a little more time I could set up my dev environment and introspect a bit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org