You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Miguel Anzo Palomo <mi...@wizeline.com> on 2021/03/29 21:31:20 UTC

[Apace BEAM Go improvements] Lazy map side inputs

Hi,
I was checking out this task BEAM-3293
<https://issues.apache.org/jira/browse/BEAM-3293> and I'm having some
issues fully understanding the idea of how side inputs work internally. Is
there any resource or specific example that can help to better understand
how they work and why/where the lazy map implementation would help so I can
get a better grasp of the task?

Thanks

-- 

Miguel Angel Anzo Palomo | WIZELINE

Software Engineer

miguel.anzo@wizeline.com

Remote Office

-- 
*This email and its contents (including any attachments) are being sent to
you on the condition of confidentiality and may be protected by legal
privilege. Access to this email by anyone other than the intended recipient
is unauthorized. If you are not the intended recipient, please immediately
notify the sender by replying to this message and delete the material
immediately from your system. Any further use, dissemination, distribution
or reproduction of this email is strictly prohibited. Further, no
representation is made with respect to any content contained in this email.*

Re: [Apace BEAM Go improvements] Lazy map side inputs

Posted by Miguel Anzo Palomo <mi...@wizeline.com>.
Hi Robert,
That’s a lot of information, and I'm still processing it, but it’s being
really useful and a great starting point to understand side inputs.
Thanks

On Tue, Mar 30, 2021 at 6:22 PM Robert Burke <re...@google.com> wrote:

> Buckle up, it's time to dive into Side Inputs along with Go SDK code links!
>
> First and foremost, all the protos that cover side inputs (namely
> beam_runner_api.proto [1], and beam_fn_api.proto [2], which nominally
> handle construction time, and execution time concerns respectively) refer
> to the design doc at
> https://s.apache.org/beam-fn-state-api-and-bundle-processing [3] when it
> comes to details around side inputs and the state API. I'm going to cover
> side inputs exclusively, from User side, to construction, to execution.
>
> Side Inputs are window scoped data outside of the parallel input that can
> be supplied through the runner. From the user side, this allows a pipeline
> to use any PCollection as additional (side) input to a DoFn, while also
> handling whether that side input is in the same window as the parallel
> input. It's better described in the programming guide section on Side
> Inputs [4], so I won't repeat their utility here. On the execution side,
> the data gets to the SDK side worker using the State Channel and API
> (described in [3])
>
> At pipeline construction time, an SDK will convert it's own internal
> constructs into the Beam Pipeline protos, and in the Go SDK, that happens
> in graphx/translate.go. The SDK translates DoFns into ParDoPayloads,
> populating a map of SideInput protos [5][6]. But before any of that, the
> user needs to be able to specify side inputs are to be used at all. So how
> does a user do that in Go?
>
> *# Part 1: Decomposition*
>
> In the Go SDK, because at time of design, Generics didn't exist (not till
> Go 1.18 at soonest), we heavily use the original generic concept: Functions
> with typed parameters. If you squint, funcs can be viewed as generic
> (apologies to the PL purists in the crowd). See the SDKs RFC [7] for more.
> Throughout this explanation I'll be using loose Go syntax to represent some
> concepts. This is not real Go code, and these symbols are arbitrary types,
> rather than concrete types. After the User hands us a DoFn, we decompose
> it's lifecycle methods (like ProcessElement) to see if it's well formed,
> and what inputs it accepts and outputs it emits.
>
>  So a DoFn with it's ProcessElement  method can specify a wide variety of
> inputs. The designers ended up decreeing that we'll just specify an order
> to the parameters (instead of explicit tags, like Java does), and that will
> determine a parameter's role in a ParDo's graph representation. Roughly, as
> a pseudo-regular expression, for a ProcessElement it's defined as follows :
>
> func(FnValue, SideInput*, FnEmit*)
>
> Where FnValue means any user element in a PCollection, SideInput can be
> one of FnValue, FnIter, FnReIter, and FnEmit represents outputs from this
> DoFn. FnIter, and FnReIter represent Beam Iterables, used either for
> SideInputs, or for GBK values (mentioned in my last email). This is a
> dramatic simplification of the more complete expression for any Lifecycle
> method (seen at [8]) which has a more complete ordering. Order is
> important, as we don't have much else to go on, with the first FnValue is
> always the parallel input. Those Fn* terms are also defined in that same
> file. This is how the SDK understands functions and parameters, though
> decomposition.
>
> In the Go SDK, Side Inputs can be an iterable (represented by a `func(*V)
> bool` type), a ReIterable (represented by a `func() func(*V) bool` type),
> or a Singleton (represented by the type, V). We use functions to handle
> iterables, because of the lack of generics. Similarly, we can and should
> handle functional representations of Map access to that data (the subject
> of this current discussion, BEAM-3293
> <https://issues.apache.org/jira/browse/BEAM-3293>).
>
> To see what a given user type parameter represents, we have helper
> functions [9] to make that determination, called in a state machine [10].
> To handle map functions for BEAM-3293, new functions, representation kinds,
> and cases would need to be added there.
>
> A wrinkle in this is that Side inputs can be a singleton value (a
> PCollection with 1 element), so the SDK can't statically know what a DoFn's
> parameters mean without the context of it's input PCollections.  Depending
> on the input PCollection, a func(K,V) that is decomposed to func(FnValue,
> FnValue) could either be a DoFn that processes a PCollection<K> with a
> singleton side input PCollection<V>, or a DoFn that processes a
> PCollection<KV<K,V>>. For that we need to bind the inputs to the DoFn
> parameters.
>
> *# Part 2: Binding*
>
> Now that the SDK understands the shape of the user DoFn, it needs to
> understand how it connects to the graph. This is done through binding the
> input PCollections to the DoFns parameters.
>
> Pertinent to this discussion is [11] where side inputs kinds are bound to
> specific types. This would need to handle and produce the new map kinds.
> SideInput Maps are simpler, since we know they *must* be associated with a
> PCollection<KV<K, V>>. If the Nth SideInput PCollection doesn't match with
> the Nth side input parameter, then an error should occur at binding time.
>
> Binding errors are tricky since the mistake is definitely at the specific
> DoFn, but all we know is there's a mismatch between what the DoFn requires
> (determined by the earlier Decomposition), and what was provided by
> pipeline construction (via beam.SideInput options). But if the shapes
> match, and the Go types match, we're golden, and can move onto proto
> translation. This should mostly be handled, but worth looking into
> specifically when adding a new side input kind.
>
> *# Part 3: Translation to Protos*
> Now that the user half of the SDK understands Map function parameter kinds
> as side inputs, we need to translate them to the proto pipeline graph. Like
> most translations to the beam protos, this happens in graphx/translate.go
> at [6]. This is arguably the critical bit, as it defines how the runner is
> supposed to serve values.
>
> How are iterable side inputs currently handled? Reading down from [6],
> when side inputs are present, the Go SDK will add an additional Transform
> that converts the input PCollection<Foo> into a PCollection<KV<K, Foo>>
> where the key is always the empty string (""), this is represented by the
> Go SDK specific URNIterableSideInputKey URN. All such SDK specific URN are
> prefixed by "beam:go" to avoid mixing them up with actual URNs defined by
> the Beam protos. This URN will be detected at execution time. Most of that
> case statement is to set up that additional Fixed Key Transform in the
> graph.  The local side input key (i#), is mapped to pull it's values from
> the keyed input (referring to the global key), along with including the
> side input in the map with that same local key.
>
> *Aside:* It appears the Go SDK presently ignores concepts like the ViewFn
> and WindowMappingFn, one or both of which might allow the SDK to avoid
> adding in the additional keying PTransform as a PTransform. This is
> Technical Debt, as this is probably not how things are done in the portable
> Python and Java SDKs. I'm uncertain how much this may need to change to
> enable Map views, but I suspect not at all at this time.
>
> The short version for random access side input maps is, we're saying
> "we're passing a PCollection<KV<K,V>> and would like you to be able to spit
> the values back out at us on demand". This means all were' putting into the
> Map or MultiMap case at line 324 below (co-opted for the functional
> interpretations, rather than as single map[K]V values) is the section
> starting at line 312, populating  the side input map [12].
>
> *# Part 4: Execution from Protos*
>
> OK, so, at this point, the user has constructed a pipeline, and started a
> job with a runner, which in turn spun up workers. Those workers include an
> SDK side harness that live largely in the Go SDK exec and harness packages.
> It connects to the FnAPI services on the runner side, and  (loosely) starts
> to receive ProcessBundleDescriptors which describe subgraphs to process and
> their associate data. Focusing still on side inputs..
>
> There are two things to look into to understand what's going on on the
> execution side (mostly handled in exec/translate.go [13]),that URN for
> Iterable Side inputs, and preparing to read side inputs.
>
> First is how we use that URNIterableSideInputKey urn. That's done at [14],
> where all it does is create a special FixedKey execution node [15]. This is
> a special node that turns PCollection<V> into PCollection<KV<K, V>>. This
> includes if that original V is actually a KV itself, like KV<K2, V2>,
> meaning the output would have nested KVs: PCollection<KV<K, KV<K2, V2>>>.
> Regardless, in most cases, the next node would be a datasink, which then
> encodes the values and outputs them to the runner. This is happening to the
> PCollection being used as a side input.  In particular, all values are
> essentially wrapped as the value of a KV, and associated with a fixed key,
> in this case the empty string ("").
>
> That key is important, since it's the key for the state APIs MultiMap side
> input for when Iterables are read. If we didn't associate everything with a
> single key, the Runner would allow random access with given key requests,
> which is what we're implementing now. Fancy that :D.
>
> On the side of the ParDo that's going to be reading the Side input, it
> receives a variety of data that it can use to query the State API, and
> process responses from it, such as the port to query against, id for this
> set of state, the receiving transform requesting the data, and the window
> coder and element coders. [16]
> These are wrapped into a side input adapter [17] and passed to the ParDo
> for actual execution [18]
>
> *Aside:* How about that, a comment about how we aren't using view_fn and
> window_mapping_fn... Likely critical for using side inputs with triggers
> and non-Global windows... No matter for now.
>
> *# Part 5: Reading the Side Input*
>
> Elsewhere the harness has received the configuration to start executing
> the plan. Data is about to flow through the Data channel, and provide our
> DoFn with primary input elements. But what about side inputs?
>
> Before each primary input element, we do some initialization to make sure
> that we have the right data to pass along along with it. Notionally this is
> a bit of light caching to ensure that the element we're executing shares a
> window with the cached data, but if it's not yet initialized or it's not,
> then we need to rebuild it from scratch. This is in ParDo.initSideInput
> [19].  Each side input configuration was wrapped into an adapter that knows
> how to produce iterables or whatever else it needs, which is happening
> here. This will most likely need to change to handle maps as well, as this
> was built with the assumption of only having iterables. This work changes
> this assumption.
>
> The Go SDK abstracts streams of data from wherever they're coming from
> with ReStreams and Streams [20], and ReusableInputs [21]. The streams are
> how decoded data is provided or reset, while the ReusableInputs are for
> rewinding those without necessarily making new runner requests.
>
> This likely won't need to change entirely with maps, but at present, we
> haven't made it simple to provide a new key for querying.
>
> How do we use user side keys for querying though? The SideInputAdapter
> NewIterable demonstrates how: We take the key, encode it, and the window,
> and request the element stream from the StateReader [22]. The StateReader
> abstracts actually calling the State service for us and its OpenSideInput
> method should continue to serve that purpose for us. It is implemented in
> harness/statemgr.go [23]. The StateReader is also where most of the
> protocol in the state API doc [3] is implemented.
>
> With a simple iterable (say func(*V) bool), the user passes an allocated v
> pointer to the iterable, and then something happens, and if it's
> successful, the function returns true, indicating the value is ready for
> use. Same if it were a func(*K,*V) bool. These functions are generated via
> reflection in exec/inputs.go [24] or code generation before compile time.
> We use this approach because generics do not exist, so we had no
> alternative. Until they do (next year sometime), map side inputs will need
> to do the same. We need reflection or code generation so we can have the
> actual type used by the user back in part 1, otherwise invoking the DoFn
> will fail at runtime. See the Design RFC [7] for the rationale.
>
> Lets say we're using a random access function, with function types like
> func(K) func(*V) bool, we need a struct to handle it's own state (the
> references to streams and such) and have a method that can pretend to be
> that type. For iterators that type is iterValue [25], and it's invoke
> method [26].The invoke method matches the signature for the
> reflect.MakeFunc method, which can then produce a value with the right
> function type. You can see makeIter [27] for how that call happens.
>
> So say we have a mapValue type, with an invoke method, it would need to
> take in it's key argument, and pass that to some ReStream object. ReStream
> is an interface type, so we can contrive things so that we know it's
> concrete type, and type assert it to it (say v.s.(mapReStream)), at which
> point we pass it the key. Then we receive the stream of values as normal,
> which can then be handled by an instance of iter.
>
> Note, I've changed the user type to `func(K) func(*V) bool` because at
> this point it can and could be a multimap, which would require it's own
> iteration. We can always add additional variations if we like, but it
> certainly complicates part 1 and part 5. At this point, once we can iterate
> through multimaps directly, I'd be willing to wait for generics where most
> of the complexity can be simplified into generic parent structs (like
> beam.KV, or beam.Iterator, or beam.ReIterator or beam.MultiMap...).
>
> This is far from fully complete, but it this should be able to get you
> started I think?
>
> Cheers,
> Robert Burke
> Who can't quite write these emails in his sleep, but did know everywhere
> to look in the Go SDK.
>
> [1]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto
> [2]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/fn-execution/src/main/proto/beam_fn_api.proto
> [3] https://s.apache.org/beam-fn-state-api-and-bundle-processing
> [4] https://beam.apache.org/documentation/programming-guide/#side-inputs
> [5]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L1186
> [6]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L278
> [7] https://s.apache.org/beam-go-sdk-design-rfc
> [8]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/fn.go#L379
> [9] hhttps://
> github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/sideinput.go#L32
> [10]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/fn.go#L318
> [11]
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/graph/bind.go#L179
>
> [12]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L312
>
> [13]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go
> [14]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go#L464
> [15]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L108
> [16]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go#L402
>
> [17]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L48
> [18]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L38
> [19]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L254
> [20]
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go#L57
> [21]
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L36
> [22]
> https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L59
> [23]
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
> [24]
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L107
> [25]
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L98
> [26]
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L153
> [27]
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L107
>
>
> On Mon, Mar 29, 2021 at 5:38 PM Ahmet Altay <al...@google.com> wrote:
>
>> Adding some folks who might be able to help: @Robert Burke
>> <re...@google.com> @Kenneth Knowles <kl...@google.com> @Tyson Hamilton
>> <ty...@google.com>
>>
>> On Mon, Mar 29, 2021 at 2:31 PM Miguel Anzo Palomo <
>> miguel.anzo@wizeline.com> wrote:
>>
>>> Hi,
>>> I was checking out this task BEAM-3293
>>> <https://issues.apache.org/jira/browse/BEAM-3293> and I'm having some
>>> issues fully understanding the idea of how side inputs work internally. Is
>>> there any resource or specific example that can help to better understand
>>> how they work and why/where the lazy map implementation would help so I can
>>> get a better grasp of the task?
>>>
>>> Thanks
>>>
>>> --
>>>
>>> Miguel Angel Anzo Palomo | WIZELINE
>>>
>>> Software Engineer
>>>
>>> miguel.anzo@wizeline.com
>>>
>>> Remote Office
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *This email and its contents (including any attachments) are being sent
>>> toyou on the condition of confidentiality and may be protected by
>>> legalprivilege. Access to this email by anyone other than the intended
>>> recipientis unauthorized. If you are not the intended recipient, please
>>> immediatelynotify the sender by replying to this message and delete the
>>> materialimmediately from your system. Any further use, dissemination,
>>> distributionor reproduction of this email is strictly prohibited. Further,
>>> norepresentation is made with respect to any content contained in this
>>> email.*
>>
>>

-- 

Miguel Angel Anzo Palomo | WIZELINE

Software Engineer

miguel.anzo@wizeline.com

Remote Office

-- 
*This email and its contents (including any attachments) are being sent to
you on the condition of confidentiality and may be protected by legal
privilege. Access to this email by anyone other than the intended recipient
is unauthorized. If you are not the intended recipient, please immediately
notify the sender by replying to this message and delete the material
immediately from your system. Any further use, dissemination, distribution
or reproduction of this email is strictly prohibited. Further, no
representation is made with respect to any content contained in this email.*

Re: [Apace BEAM Go improvements] Lazy map side inputs

Posted by Robert Burke <re...@google.com>.
Buckle up, it's time to dive into Side Inputs along with Go SDK code links!

First and foremost, all the protos that cover side inputs (namely
beam_runner_api.proto [1], and beam_fn_api.proto [2], which nominally
handle construction time, and execution time concerns respectively) refer
to the design doc at
https://s.apache.org/beam-fn-state-api-and-bundle-processing [3] when it
comes to details around side inputs and the state API. I'm going to cover
side inputs exclusively, from User side, to construction, to execution.

Side Inputs are window scoped data outside of the parallel input that can
be supplied through the runner. From the user side, this allows a pipeline
to use any PCollection as additional (side) input to a DoFn, while also
handling whether that side input is in the same window as the parallel
input. It's better described in the programming guide section on Side
Inputs [4], so I won't repeat their utility here. On the execution side,
the data gets to the SDK side worker using the State Channel and API
(described in [3])

At pipeline construction time, an SDK will convert it's own internal
constructs into the Beam Pipeline protos, and in the Go SDK, that happens
in graphx/translate.go. The SDK translates DoFns into ParDoPayloads,
populating a map of SideInput protos [5][6]. But before any of that, the
user needs to be able to specify side inputs are to be used at all. So how
does a user do that in Go?

*# Part 1: Decomposition*

In the Go SDK, because at time of design, Generics didn't exist (not till
Go 1.18 at soonest), we heavily use the original generic concept: Functions
with typed parameters. If you squint, funcs can be viewed as generic
(apologies to the PL purists in the crowd). See the SDKs RFC [7] for more.
Throughout this explanation I'll be using loose Go syntax to represent some
concepts. This is not real Go code, and these symbols are arbitrary types,
rather than concrete types. After the User hands us a DoFn, we decompose
it's lifecycle methods (like ProcessElement) to see if it's well formed,
and what inputs it accepts and outputs it emits.

 So a DoFn with it's ProcessElement  method can specify a wide variety of
inputs. The designers ended up decreeing that we'll just specify an order
to the parameters (instead of explicit tags, like Java does), and that will
determine a parameter's role in a ParDo's graph representation. Roughly, as
a pseudo-regular expression, for a ProcessElement it's defined as follows :

func(FnValue, SideInput*, FnEmit*)

Where FnValue means any user element in a PCollection, SideInput can be one
of FnValue, FnIter, FnReIter, and FnEmit represents outputs from this DoFn.
FnIter, and FnReIter represent Beam Iterables, used either for SideInputs,
or for GBK values (mentioned in my last email). This is a dramatic
simplification of the more complete expression for any Lifecycle method
(seen at [8]) which has a more complete ordering. Order is important, as we
don't have much else to go on, with the first FnValue is always the
parallel input. Those Fn* terms are also defined in that same file. This is
how the SDK understands functions and parameters, though decomposition.

In the Go SDK, Side Inputs can be an iterable (represented by a `func(*V)
bool` type), a ReIterable (represented by a `func() func(*V) bool` type),
or a Singleton (represented by the type, V). We use functions to handle
iterables, because of the lack of generics. Similarly, we can and should
handle functional representations of Map access to that data (the subject
of this current discussion, BEAM-3293
<https://issues.apache.org/jira/browse/BEAM-3293>).

To see what a given user type parameter represents, we have helper
functions [9] to make that determination, called in a state machine [10].
To handle map functions for BEAM-3293, new functions, representation kinds,
and cases would need to be added there.

A wrinkle in this is that Side inputs can be a singleton value (a
PCollection with 1 element), so the SDK can't statically know what a DoFn's
parameters mean without the context of it's input PCollections.  Depending
on the input PCollection, a func(K,V) that is decomposed to func(FnValue,
FnValue) could either be a DoFn that processes a PCollection<K> with a
singleton side input PCollection<V>, or a DoFn that processes a
PCollection<KV<K,V>>. For that we need to bind the inputs to the DoFn
parameters.

*# Part 2: Binding*

Now that the SDK understands the shape of the user DoFn, it needs to
understand how it connects to the graph. This is done through binding the
input PCollections to the DoFns parameters.

Pertinent to this discussion is [11] where side inputs kinds are bound to
specific types. This would need to handle and produce the new map kinds.
SideInput Maps are simpler, since we know they *must* be associated with a
PCollection<KV<K, V>>. If the Nth SideInput PCollection doesn't match with
the Nth side input parameter, then an error should occur at binding time.

Binding errors are tricky since the mistake is definitely at the specific
DoFn, but all we know is there's a mismatch between what the DoFn requires
(determined by the earlier Decomposition), and what was provided by
pipeline construction (via beam.SideInput options). But if the shapes
match, and the Go types match, we're golden, and can move onto proto
translation. This should mostly be handled, but worth looking into
specifically when adding a new side input kind.

*# Part 3: Translation to Protos*
Now that the user half of the SDK understands Map function parameter kinds
as side inputs, we need to translate them to the proto pipeline graph. Like
most translations to the beam protos, this happens in graphx/translate.go
at [6]. This is arguably the critical bit, as it defines how the runner is
supposed to serve values.

How are iterable side inputs currently handled? Reading down from [6], when
side inputs are present, the Go SDK will add an additional Transform that
converts the input PCollection<Foo> into a PCollection<KV<K, Foo>> where
the key is always the empty string (""), this is represented by the Go SDK
specific URNIterableSideInputKey URN. All such SDK specific URN are
prefixed by "beam:go" to avoid mixing them up with actual URNs defined by
the Beam protos. This URN will be detected at execution time. Most of that
case statement is to set up that additional Fixed Key Transform in the
graph.  The local side input key (i#), is mapped to pull it's values from
the keyed input (referring to the global key), along with including the
side input in the map with that same local key.

*Aside:* It appears the Go SDK presently ignores concepts like the ViewFn
and WindowMappingFn, one or both of which might allow the SDK to avoid
adding in the additional keying PTransform as a PTransform. This is
Technical Debt, as this is probably not how things are done in the portable
Python and Java SDKs. I'm uncertain how much this may need to change to
enable Map views, but I suspect not at all at this time.

The short version for random access side input maps is, we're saying "we're
passing a PCollection<KV<K,V>> and would like you to be able to spit the
values back out at us on demand". This means all were' putting into the Map
or MultiMap case at line 324 below (co-opted for the functional
interpretations, rather than as single map[K]V values) is the section
starting at line 312, populating  the side input map [12].

*# Part 4: Execution from Protos*

OK, so, at this point, the user has constructed a pipeline, and started a
job with a runner, which in turn spun up workers. Those workers include an
SDK side harness that live largely in the Go SDK exec and harness packages.
It connects to the FnAPI services on the runner side, and  (loosely) starts
to receive ProcessBundleDescriptors which describe subgraphs to process and
their associate data. Focusing still on side inputs..

There are two things to look into to understand what's going on on the
execution side (mostly handled in exec/translate.go [13]),that URN for
Iterable Side inputs, and preparing to read side inputs.

First is how we use that URNIterableSideInputKey urn. That's done at [14],
where all it does is create a special FixedKey execution node [15]. This is
a special node that turns PCollection<V> into PCollection<KV<K, V>>. This
includes if that original V is actually a KV itself, like KV<K2, V2>,
meaning the output would have nested KVs: PCollection<KV<K, KV<K2, V2>>>.
Regardless, in most cases, the next node would be a datasink, which then
encodes the values and outputs them to the runner. This is happening to the
PCollection being used as a side input.  In particular, all values are
essentially wrapped as the value of a KV, and associated with a fixed key,
in this case the empty string ("").

That key is important, since it's the key for the state APIs MultiMap side
input for when Iterables are read. If we didn't associate everything with a
single key, the Runner would allow random access with given key requests,
which is what we're implementing now. Fancy that :D.

On the side of the ParDo that's going to be reading the Side input, it
receives a variety of data that it can use to query the State API, and
process responses from it, such as the port to query against, id for this
set of state, the receiving transform requesting the data, and the window
coder and element coders. [16]
These are wrapped into a side input adapter [17] and passed to the ParDo
for actual execution [18]

*Aside:* How about that, a comment about how we aren't using view_fn and
window_mapping_fn... Likely critical for using side inputs with triggers
and non-Global windows... No matter for now.

*# Part 5: Reading the Side Input*

Elsewhere the harness has received the configuration to start executing the
plan. Data is about to flow through the Data channel, and provide our DoFn
with primary input elements. But what about side inputs?

Before each primary input element, we do some initialization to make sure
that we have the right data to pass along along with it. Notionally this is
a bit of light caching to ensure that the element we're executing shares a
window with the cached data, but if it's not yet initialized or it's not,
then we need to rebuild it from scratch. This is in ParDo.initSideInput
[19].  Each side input configuration was wrapped into an adapter that knows
how to produce iterables or whatever else it needs, which is happening
here. This will most likely need to change to handle maps as well, as this
was built with the assumption of only having iterables. This work changes
this assumption.

The Go SDK abstracts streams of data from wherever they're coming from with
ReStreams and Streams [20], and ReusableInputs [21]. The streams are how
decoded data is provided or reset, while the ReusableInputs are for
rewinding those without necessarily making new runner requests.

This likely won't need to change entirely with maps, but at present, we
haven't made it simple to provide a new key for querying.

How do we use user side keys for querying though? The SideInputAdapter
NewIterable demonstrates how: We take the key, encode it, and the window,
and request the element stream from the StateReader [22]. The StateReader
abstracts actually calling the State service for us and its OpenSideInput
method should continue to serve that purpose for us. It is implemented in
harness/statemgr.go [23]. The StateReader is also where most of the
protocol in the state API doc [3] is implemented.

With a simple iterable (say func(*V) bool), the user passes an allocated v
pointer to the iterable, and then something happens, and if it's
successful, the function returns true, indicating the value is ready for
use. Same if it were a func(*K,*V) bool. These functions are generated via
reflection in exec/inputs.go [24] or code generation before compile time.
We use this approach because generics do not exist, so we had no
alternative. Until they do (next year sometime), map side inputs will need
to do the same. We need reflection or code generation so we can have the
actual type used by the user back in part 1, otherwise invoking the DoFn
will fail at runtime. See the Design RFC [7] for the rationale.

Lets say we're using a random access function, with function types like
func(K) func(*V) bool, we need a struct to handle it's own state (the
references to streams and such) and have a method that can pretend to be
that type. For iterators that type is iterValue [25], and it's invoke
method [26].The invoke method matches the signature for the
reflect.MakeFunc method, which can then produce a value with the right
function type. You can see makeIter [27] for how that call happens.

So say we have a mapValue type, with an invoke method, it would need to
take in it's key argument, and pass that to some ReStream object. ReStream
is an interface type, so we can contrive things so that we know it's
concrete type, and type assert it to it (say v.s.(mapReStream)), at which
point we pass it the key. Then we receive the stream of values as normal,
which can then be handled by an instance of iter.

Note, I've changed the user type to `func(K) func(*V) bool` because at this
point it can and could be a multimap, which would require it's own
iteration. We can always add additional variations if we like, but it
certainly complicates part 1 and part 5. At this point, once we can iterate
through multimaps directly, I'd be willing to wait for generics where most
of the complexity can be simplified into generic parent structs (like
beam.KV, or beam.Iterator, or beam.ReIterator or beam.MultiMap...).

This is far from fully complete, but it this should be able to get you
started I think?

Cheers,
Robert Burke
Who can't quite write these emails in his sleep, but did know everywhere to
look in the Go SDK.

[1]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto
[2]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/fn-execution/src/main/proto/beam_fn_api.proto
[3] https://s.apache.org/beam-fn-state-api-and-bundle-processing
[4] https://beam.apache.org/documentation/programming-guide/#side-inputs
[5]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L1186
[6]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L278
[7] https://s.apache.org/beam-go-sdk-design-rfc
[8]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/fn.go#L379
[9] hhttps://
github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/sideinput.go#L32
[10]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/funcx/fn.go#L318
[11]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/graph/bind.go#L179

[12]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L312

[13]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go
[14]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go#L464
[15]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L108
[16]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/translate.go#L402

[17]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L48
[18]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L38
[19]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L254
[20]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go#L57
[21]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L36
[22]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/sideinput.go#L59
[23]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
[24]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L107
[25]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L98
[26]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L153
[27]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/go/pkg/beam/core/runtime/exec/input.go#L107


On Mon, Mar 29, 2021 at 5:38 PM Ahmet Altay <al...@google.com> wrote:

> Adding some folks who might be able to help: @Robert Burke
> <re...@google.com> @Kenneth Knowles <kl...@google.com> @Tyson Hamilton
> <ty...@google.com>
>
> On Mon, Mar 29, 2021 at 2:31 PM Miguel Anzo Palomo <
> miguel.anzo@wizeline.com> wrote:
>
>> Hi,
>> I was checking out this task BEAM-3293
>> <https://issues.apache.org/jira/browse/BEAM-3293> and I'm having some
>> issues fully understanding the idea of how side inputs work internally. Is
>> there any resource or specific example that can help to better understand
>> how they work and why/where the lazy map implementation would help so I can
>> get a better grasp of the task?
>>
>> Thanks
>>
>> --
>>
>> Miguel Angel Anzo Palomo | WIZELINE
>>
>> Software Engineer
>>
>> miguel.anzo@wizeline.com
>>
>> Remote Office
>>
>>
>>
>>
>>
>>
>>
>>
>> *This email and its contents (including any attachments) are being sent
>> toyou on the condition of confidentiality and may be protected by
>> legalprivilege. Access to this email by anyone other than the intended
>> recipientis unauthorized. If you are not the intended recipient, please
>> immediatelynotify the sender by replying to this message and delete the
>> materialimmediately from your system. Any further use, dissemination,
>> distributionor reproduction of this email is strictly prohibited. Further,
>> norepresentation is made with respect to any content contained in this
>> email.*
>
>

Re: [Apace BEAM Go improvements] Lazy map side inputs

Posted by Ahmet Altay <al...@google.com>.
Adding some folks who might be able to help: @Robert Burke <re...@google.com>
 @Kenneth Knowles <kl...@google.com> @Tyson Hamilton <ty...@google.com>

On Mon, Mar 29, 2021 at 2:31 PM Miguel Anzo Palomo <mi...@wizeline.com>
wrote:

> Hi,
> I was checking out this task BEAM-3293
> <https://issues.apache.org/jira/browse/BEAM-3293> and I'm having some
> issues fully understanding the idea of how side inputs work internally. Is
> there any resource or specific example that can help to better understand
> how they work and why/where the lazy map implementation would help so I can
> get a better grasp of the task?
>
> Thanks
>
> --
>
> Miguel Angel Anzo Palomo | WIZELINE
>
> Software Engineer
>
> miguel.anzo@wizeline.com
>
> Remote Office
>
>
>
>
>
>
>
>
> *This email and its contents (including any attachments) are being sent
> toyou on the condition of confidentiality and may be protected by
> legalprivilege. Access to this email by anyone other than the intended
> recipientis unauthorized. If you are not the intended recipient, please
> immediatelynotify the sender by replying to this message and delete the
> materialimmediately from your system. Any further use, dissemination,
> distributionor reproduction of this email is strictly prohibited. Further,
> norepresentation is made with respect to any content contained in this
> email.*