You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Brian Hulette <bh...@google.com> on 2021/09/15 18:10:27 UTC

Re: [PROPOSAL] Projection pushdown in Beam Java

Thanks for the proposal Kyle! I wanted to write up some thoughts around the
portable proposal (Option 2), and it grew well beyond the size of a doc
comment.

The basic requirements for this work, which are satisfied by Option 1, are:
- to allow pushdown into *Java SDK* sources
- based on field accesses in *Java SDK* transforms
- when building a pipeline with the *Java SDK*.

However down the road I would really like to see all of those requirements
go from *Java SDK* to *any SDK*. It doesn't need to happen right away, but
it would be great to avoid going down a path that's not amenable to such an
approach in the future. So it's worth thinking about what the "any SDK"
approach will look like now. (Presumably others feel this way as well, thus
Option 2).

Thinking about a portable version, we can break down these requirements
further:
- To support pushdown into *any SDK* sources we need
  a) a portable way to define (or discover) what projections a source is
capable of
  b) a portable way to modify the source to perform those projections
- To support tracking field accesses in *any SDK* transforms we need
  c) a portable way to annotate field accesses (this isn't so bad, we know
how to annotate transforms, but the devil is in the details I suppose).
- To support pushdown when building a pipeline with *any SDK* we need
  d) some SDK-agnostic code for applying the pushdown

I think (a,b,c) are solvable, but they depend on (d), which is a big
unknown (to me). It looks like Option 2 pursues putting this pushdown code
in runners, and having them operate on the Runner API protos directly. I
have a few concerns with this:
- It precludes having complex logic for (a,b) in the SDKs. This was touched
on in your document already [1]. I don't think it's that problematic for
projection, but as we look forward to predicate pushdown I think we'll need
to allow complex logic in the SDK for discovering supported predicates
(e.g. see the logic for predicate discovery in BigQuery [2]).
- Will individual runners be responsible for implementing this? We could
put the logic in runners-core-construction-java to re-use it across
JVM-based runners, but what about others? Does this just move the code
duplication from the SDK side to the runner side?
- Finally, Option 1 is not really a stepping stone to this approach.

An alternative portable approach might be to have an intermediate cross-SDK
and cross-runner "optimizer", which (optionally) runs between the SDK and
the runner. It could send expansion requests back to SDKs to "re-expand"
sources with projections (and later predicates) applied. This would address
all of the above:
- Since we make expansion requests to apply projections, the SDKs can
implement complex logic for (a,b).
- The pushdown code isn't in a runner or an SDK, we'd only need one
optimizer implementation.
- Option 1 could be a stepping stone to this approach. The optimizer could
be implemented with code from Option 1, if it's written with this in mind.

This does have some downsides:
- Sources are expanded twice. IO authors will need to avoid side effects in
expansion methods, and any expensive logic will be executed twice.
- It adds a Java dependency for non-Java SDKs (It is cleanly separable
though: if you don't want pushdown you don't need Java).

I'm curious what others think about this approach. Is it tenable? Am I
missing something?

Thanks!
Brian

[1]
https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
[2]
https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117

On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com> wrote:

> Thanks, I made a pass over Option 2
>
> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <kc...@google.com> wrote:
>
>> Hi again everyone, thanks for all of your feedback. I've almost
>> completely rewritten the document to include another design option and
>> address more possibilities and challenges. Please take a look.
>>
>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <xi...@gmail.com> wrote:
>>
>>> Very happy to see we will have pushdown optimizations for java
>>> pipelines! Thanks for sharing the proposal.
>>>
>>> Thanks,
>>> XInyu
>>>
>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> Thanks Kyle, very promising. I left some comments.
>>>>
>>>> —
>>>> Alexey
>>>>
>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com> wrote:
>>>>
>>>> Thanks, I took a look at it.
>>>>
>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <kc...@google.com> wrote:
>>>>
>>>>> Hi Beam devs,
>>>>>
>>>>> I'm back with another proposal involving Schema-based optimization in
>>>>> the Beam Java SDK. This one builds on the ideas in my previous proposal and
>>>>> is broader in scope. Please leave comments if this area is of interest to
>>>>> you.
>>>>>
>>>>>
>>>>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>>>>>
>>>>> Thank you,
>>>>> Kyle
>>>>>
>>>>
>>>>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, Sep 27, 2021 at 1:45 PM Kyle Weaver <kc...@google.com> wrote:
>>
>> My primary concern with Option 1 has been that it does not seem a
>> stepping stone for anything portable/multi-language, and as we'll
>> definitely want to support that (IMHO sooner rather than later) I
>> think whatever we build should go in that direction.
>
>
> We can resolve this by designing a common set of interfaces that can be used by either option 1 or 3. There would need to be an interface for producers and another for consumers of projections. For producers, the PushdownProjector [1] interface I previously added is limited to PTransforms, but as Brian suggested, we should make a generic interface so we can implement it on DoFns, etc. as well. For consumers, we would similarly be able to implement the interface on both transforms and DoFns. For DoFns, we could implement the new interface using the existing FieldAccess descriptor annotation. I uploaded a draft PR to show what I mean [2]. We could use these interfaces directly for option 1, and translate them into PTransform annotations for option 3.

Yes. The key to make sure this is compatible with both options is to
ensure these interfaces can all be called prior to optimization (or,
for actuation, on the workers).

> Once we have this common set of interfaces, everything else is just an "implementation detail." Options 1 and 3 are then not contradictory. They're both useful, and there's no reason we couldn't easily implement both of them. This way we can cover both the legacy Java and portable/cross-language use cases.

+1. (Given the fact that we pass through the portable pipeline protos
even for legacy jobs, I don't know what there is to be gained to
having separate implementations. I suppose the Java implementation
isn't as amenable to easily plumbing things through translation, so
perhaps that could simplify getting an initial prototype off the
ground.)

> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/PushdownProjector.java
> [2] https://github.com/apache/beam/pull/15599
>
> On Mon, Sep 27, 2021 at 10:04 AM Andrew Pilloud <ap...@google.com> wrote:
>>
>> Luke's suggestion matches what is currently in SQL. An IO is given a filter in a canonical format (an array of ANDs) and it returns back a subset of the filter it doesn't support. An initial implementation could provide the IO with the filter and let it run whatever it can without returning the supported set. For many cases there is little cost to running the filter multiple times, the primary reason for returning the unsupported subset is actually for additional projection pushdown.
>>
>> On Mon, Sep 20, 2021 at 4:19 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>> Instead of having the source state its capabilities of what it supports, it might be easier to say what v1 of pushdown is and then have sources be able convert that spec into something it supports, possibly doing simplifications over the expression.
>>>
>>> For example, the spec could be FILTER with AND and OR. Then a specific implementation could choose to not support OR and detect that OR is being used and decide to perform the filtering after the read has been done while if it got a filter with only ANDs then it could push it down. As long as we provide a generic library to convert any filter expression into a function that can be used to perform the filtering after the read has been done it would be pretty easy for IOs to adopt support for whatever they can actually do.
>>>
>>> On Mon, Sep 20, 2021 at 4:10 PM Brian Hulette <bh...@google.com> wrote:
>>>>
>>>> The other drawback of Option 2 is that we need a portable way to encode information about projections supported by a source, i.e.:
>>>>
>>>>   // Describes which projections are supported by this ParDo.
>>>>   message ProjectSupport {
>>>>     // Whether projection is supported at all. If false, no projections are supported.
>>>>     bool supports_projection = 1;
>>>>     // Whether a projection that outputs fields in a different order than its input is supported.
>>>>     bool supports_field_reordering = 2;
>>>>   }
>>>>
>>>> This looks reasonable for projection but I don't think it's feasible looking forward to predicate pushdown, where there can be much more variability in what a source supports.
>>>>
>>>>
>>>> On Mon, Sep 20, 2021 at 3:35 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>>
>>>>> On Mon, Sep 20, 2021 at 3:26 PM Luke Cwik <lc...@google.com> wrote:
>>>>> >
>>>>> > We could make SDKs that support projection/pushdown/... be required to expose an expansion like API as part of the SDK container. This way we don't have to worry about the original expansion service that may have produced the transforms in that case.
>>>>>
>>>>> Yes, as mentioned this would mean attaching environments to composites as well.
>>>>>
>>>>> > As for "d) some SDK-agnostic code for applying the pushdown", can we not have a runner say "filter on field X and Y" in an agnostic way as part of the ProcessBundleDescriptor and then have the SDK apply the optimization using SDK specific code? This way the SDK would be responsible for converting the generic representation into the IO specific representation.
>>>>>
>>>>> That is precisely what my implementation does. The only missing bit is
>>>>> the ability to provide alternative expansions for composite operations
>>>>> (pre-fusion and other optimizations, of course). But given the
>>>>> inventory of sources, I'm not convinced that's needed. (As we've
>>>>> discussed, a runner-independent way of providing multiple alternative
>>>>> expansions for a composite could be provided anyway.)
>>>>>
>>>>>
>>>>> > On Thu, Sep 16, 2021 at 5:01 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>> >>
>>>>> >>  On Thu, Sep 16, 2021 at 3:45 PM Kyle Weaver <kc...@google.com> wrote:
>>>>> >> >>
>>>>> >> >> I think this
>>>>> >> >> came up earlier, but our discussion could be informed by concrete
>>>>> >> >> examples of 4-5 sources that support pushdown and provide sufficient
>>>>> >> >> diversity.
>>>>> >> >
>>>>> >> > I looked at the four IOs that support pushdown in BeamSqlTable: BigQuery, BigTable, MongoDB, and Parquet. Of the four, only ParquetIO expands into a simple ParDo(DoFn).
>>>>> >>
>>>>> >> Interesting. In Python ParquetIO is the complicated one that expands
>>>>> >> into a Read.from(BoundedSource) followed by a DoFn.
>>>>> >>
>>>>> >> > The other three all expand into Read.from(BoundedSource).
>>>>> >>
>>>>> >> This is the nice, trivial case where there is no composite structure
>>>>> >> to worry about. Do we have any examples where we'd need additional
>>>>> >> complexity to be able to expand composites differently?
>>>>> >>
>>>>> >> > This introduces yet another layer of indirection. BoundedSource is where we must receive the pushdown request, but the BoundedSourceAsSDFWrapperFn is the actual DoFn, so we would need to pass pushdown information from BoundedSourceAsSDFWrapperFn to BoundedSource.
>>>>> >>
>>>>> >> I don't think this extra level of indirection is a problem. It's what
>>>>> >> one would expect of a shim whose purpose is to wrap one API in
>>>>> >> another, and should be quite straightforward.
>>>>> >>
>>>>> >> > This is a symptom of a larger problem: option 3 requires plumbing pushdown information through numerous layers of the Beam stack, from the protos to the SDK worker to the SDK’s DoFn implementation, and finally all the way through each IO implementation. In contrast, option 1 is entirely self-contained. It requires one method for pipeline optimization, and one trivial withProjectionPushdown method per IO.
>>>>> >>
>>>>> >> It's true that an option scoped to a single language/SDK can be more
>>>>> >> self-contained, but the whole point is that we don't want to be so
>>>>> >> self-contained. Anything that operates at a language-agnostic level
>>>>> >> requires plumbing between the user code and the protos, but this is
>>>>> >> O(1) work and you're done (<200 lines for Python) and are back to just
>>>>> >> writing a projection method per IO (or, at most, per step in the IO,
>>>>> >> though it looks like that's usually 1 per the four sources mentioned
>>>>> >> above).
>>>>> >>
>>>>> >>
>>>>> >> > Whether option 1 is a stepping stone to the portable/cross-language solution remains to be seen; it depends on the portable/cross-language solution we end up choosing. But it's the simplest solution to the problem I set out to solve.
>>>>> >> >
>>>>> >> >
>>>>> >> >
>>>>> >> > On Thu, Sep 16, 2021 at 12:59 PM Brian Hulette <bh...@google.com> wrote:
>>>>> >> >>>
>>>>> >> >>> This would be a little bit like the environment
>>>>> >> >>> (though it would also get attached to composites); maybe a reference
>>>>> >> >>> to docker image that could get started up and communicated with?
>>>>> >> >>
>>>>> >> >>
>>>>> >> >> I was thinking along similar lines. The idea I had was that composites from the non-submitting SDK could be annotated with the expansion service that they came from, and we'd have a "loopback" service for the submitting SDK transforms. But that would require that the expansion services are still running, or we include enough information to start them up again.
>>>>> >> >>
>>>>> >> >>> Note that this does not obviate the need to come up with some
>>>>> >> >>> language-agnostic way of defining the projections/predicates one wants
>>>>> >> >>> to perform, but it does let one indicate whether one supports a given
>>>>> >> >>> projection in code rather than having to come up with a language for
>>>>> >> >>> that as well. The primary advantage is that it allows one to act on
>>>>> >> >>> composites rather than at the level of primitives which is nice.
>>>>> >> >>> Whether this is necessary depends on how often full expansions (rather
>>>>> >> >>> than just the primitives) differ when a projection/predicate is pushed
>>>>> >> >>> down, and I have yet to come up with an example of that. (I think this
>>>>> >> >>> came up earlier, but our discussion could be informed by concrete
>>>>> >> >>> examples of 4-5 sources that support pushdown and provide sufficient
>>>>> >> >>> diversity.)
>>>>> >> >>
>>>>> >> >>
>>>>> >> >> Thanks, this articulates the advantages well. I don't think we can avoid having a language-agnostic way to represent projections/predicates (there may be something open source we can leverage or at least model here, e.g. I'm keeping an eye on Substrait [1,2]).
>>>>> >> >> I think you're probably right that it's rare a source with a pushdown applied will need a different set of primitives. So the biggest advantage of Option 4 is that it lets you articulate supported pushdowns in code. If we create an entirely new "pushdown" service, maybe it could just handle that instead of doing the full expansion.
>>>>> >> >>
>>>>> >> >> [1] https://substrait.io/
>>>>> >> >> [2] https://lists.apache.org/thread.html/r6112e3c28cd6759867c3bc37b546c0d8183f9431c6dbe234d3af018f%40%3Cdev.calcite.apache.org%3E
>>>>> >> >>
>>>>> >> >> On Wed, Sep 15, 2021 at 6:10 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>> >> >>>
>>>>> >> >>> The re-expansion idea is an interesting one. Let's call this Option 4.
>>>>> >> >>> The main question is how it would be invoked.
>>>>> >> >>>
>>>>> >> >>> As well as adding complexity, I don't think it would work to do it as
>>>>> >> >>> part of the SDK's submission to the runner. For one thing, it'd be
>>>>> >> >>> best if we could just fire-and-forget, even writing the spec to disk
>>>>> >> >>> and then having the runner pick it up later, and we'd have to figure
>>>>> >> >>> out how to make this work with interactive and similar services that
>>>>> >> >>> act as runners but are really proxies for other runners. The other
>>>>> >> >>> problem is that the submitting SDK may not be the SDK that generated
>>>>> >> >>> the transform.
>>>>> >> >>>
>>>>> >> >>> The other option would be to have some kind of a "pushdown service"
>>>>> >> >>> that would be referenced on the transform and could be invoked to do
>>>>> >> >>> the re-expansion. This would be a little bit like the environment
>>>>> >> >>> (though it would also get attached to composites); maybe a reference
>>>>> >> >>> to docker image that could get started up and communicated with?
>>>>> >> >>> (Maybe the SDK image itself could be used with an alternative
>>>>> >> >>> entrypoint? Assuming docker-on-docker works well...) Might get a bit
>>>>> >> >>> heavy-weight, but maybe we make it optional (though I'd rather it be
>>>>> >> >>> something you get by default).
>>>>> >> >>>
>>>>> >> >>> Definitely worth expanding more.
>>>>> >> >>>
>>>>> >> >>>
>>>>> >> >>>
>>>>> >> >>> My primary concern with Option 1 has been that it does not seem a
>>>>> >> >>> stepping stone for anything portable/multi-language, and as we'll
>>>>> >> >>> definitely want to support that (IMHO sooner rather than later) I
>>>>> >> >>> think whatever we build should go in that direction.
>>>>> >> >>>
>>>>> >> >>> Note that this does not obviate the need to come up with some
>>>>> >> >>> language-agnostic way of defining the projections/predicates one wants
>>>>> >> >>> to perform, but it does let one indicate whether one supports a given
>>>>> >> >>> projection in code rather than having to come up with a language for
>>>>> >> >>> that as well. The primary advantage is that it allows one to act on
>>>>> >> >>> composites rather than at the level of primitives which is nice.
>>>>> >> >>> Whether this is necessary depends on how often full expansions (rather
>>>>> >> >>> than just the primitives) differ when a projection/predicate is pushed
>>>>> >> >>> down, and I have yet to come up with an example of that. (I think this
>>>>> >> >>> came up earlier, but our discussion could be informed by concrete
>>>>> >> >>> examples of 4-5 sources that support pushdown and provide sufficient
>>>>> >> >>> diversity.)
>>>>> >> >>>
>>>>> >> >>> I do think Option 3 could be a stepping stone to Option 4, viewing it
>>>>> >> >>> as a mechanism for "simple" pushdowns to be done inline rather than a
>>>>> >> >>> via remote execution.
>>>>> >> >>>
>>>>> >> >>> - Robert
>>>>> >> >>>
>>>>> >> >>>
>>>>> >> >>> On Wed, Sep 15, 2021 at 1:59 PM Kyle Weaver <kc...@google.com> wrote:
>>>>> >> >>> >
>>>>> >> >>> > Oops, I accidentally hit the send button...
>>>>> >> >>> >
>>>>> >> >>> > As I was saying, I think that's an excellent idea. I discussed this with Andrew previously and we came to a similar conclusion, that re-expansion is more flexible than annotations, especially when it comes to predicate pushdown.
>>>>> >> >>> >
>>>>> >> >>> > Robert made a prototype [1] of Option 2 in the Python SDK that should answer some of your questions about that option. Specifically, for your requirement d), the SDK worker could apply the pushdown before processing. Since SDK workers are shared across runners, that would mean the individual runners themselves would not have to re-implement pushdown. This approach has other drawbacks, though, which I have enumerated in my design doc.
>>>>> >> >>> >
>>>>> >> >>> > [1] https://github.com/apache/beam/pull/15351
>>>>> >> >>> >
>>>>> >> >>> > On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <kc...@google.com> wrote:
>>>>> >> >>> >>
>>>>> >> >>> >> Hi Brian. I think that's an excele
>>>>> >> >>> >>
>>>>> >> >>> >> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <bh...@google.com> wrote:
>>>>> >> >>> >>>
>>>>> >> >>> >>> Thanks for the proposal Kyle! I wanted to write up some thoughts around the portable proposal (Option 2), and it grew well beyond the size of a doc comment.
>>>>> >> >>> >>>
>>>>> >> >>> >>> The basic requirements for this work, which are satisfied by Option 1, are:
>>>>> >> >>> >>> - to allow pushdown into *Java SDK* sources
>>>>> >> >>> >>> - based on field accesses in *Java SDK* transforms
>>>>> >> >>> >>> - when building a pipeline with the *Java SDK*.
>>>>> >> >>> >>>
>>>>> >> >>> >>> However down the road I would really like to see all of those requirements go from *Java SDK* to *any SDK*. It doesn't need to happen right away, but it would be great to avoid going down a path that's not amenable to such an approach in the future. So it's worth thinking about what the "any SDK" approach will look like now. (Presumably others feel this way as well, thus Option 2).
>>>>> >> >>> >>>
>>>>> >> >>> >>> Thinking about a portable version, we can break down these requirements further:
>>>>> >> >>> >>> - To support pushdown into *any SDK* sources we need
>>>>> >> >>> >>>   a) a portable way to define (or discover) what projections a source is capable of
>>>>> >> >>> >>>   b) a portable way to modify the source to perform those projections
>>>>> >> >>> >>> - To support tracking field accesses in *any SDK* transforms we need
>>>>> >> >>> >>>   c) a portable way to annotate field accesses (this isn't so bad, we know how to annotate transforms, but the devil is in the details I suppose).
>>>>> >> >>> >>> - To support pushdown when building a pipeline with *any SDK* we need
>>>>> >> >>> >>>   d) some SDK-agnostic code for applying the pushdown
>>>>> >> >>> >>>
>>>>> >> >>> >>> I think (a,b,c) are solvable, but they depend on (d), which is a big unknown (to me). It looks like Option 2 pursues putting this pushdown code in runners, and having them operate on the Runner API protos directly. I have a few concerns with this:
>>>>> >> >>> >>> - It precludes having complex logic for (a,b) in the SDKs. This was touched on in your document already [1]. I don't think it's that problematic for projection, but as we look forward to predicate pushdown I think we'll need to allow complex logic in the SDK for discovering supported predicates (e.g. see the logic for predicate discovery in BigQuery [2]).
>>>>> >> >>> >>> - Will individual runners be responsible for implementing this? We could put the logic in runners-core-construction-java to re-use it across JVM-based runners, but what about others? Does this just move the code duplication from the SDK side to the runner side?
>>>>> >> >>> >>> - Finally, Option 1 is not really a stepping stone to this approach.
>>>>> >> >>> >>>
>>>>> >> >>> >>> An alternative portable approach might be to have an intermediate cross-SDK and cross-runner "optimizer", which (optionally) runs between the SDK and the runner. It could send expansion requests back to SDKs to "re-expand" sources with projections (and later predicates) applied. This would address all of the above:
>>>>> >> >>> >>> - Since we make expansion requests to apply projections, the SDKs can implement complex logic for (a,b).
>>>>> >> >>> >>> - The pushdown code isn't in a runner or an SDK, we'd only need one optimizer implementation.
>>>>> >> >>> >>> - Option 1 could be a stepping stone to this approach. The optimizer could be implemented with code from Option 1, if it's written with this in mind.
>>>>> >> >>> >>>
>>>>> >> >>> >>> This does have some downsides:
>>>>> >> >>> >>> - Sources are expanded twice. IO authors will need to avoid side effects in expansion methods, and any expensive logic will be executed twice.
>>>>> >> >>> >>> - It adds a Java dependency for non-Java SDKs (It is cleanly separable though: if you don't want pushdown you don't need Java).
>>>>> >> >>> >>>
>>>>> >> >>> >>> I'm curious what others think about this approach. Is it tenable? Am I missing something?
>>>>> >> >>> >>>
>>>>> >> >>> >>> Thanks!
>>>>> >> >>> >>> Brian
>>>>> >> >>> >>>
>>>>> >> >>> >>> [1] https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
>>>>> >> >>> >>> [2] https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
>>>>> >> >>> >>>
>>>>> >> >>> >>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com> wrote:
>>>>> >> >>> >>>>
>>>>> >> >>> >>>> Thanks, I made a pass over Option 2
>>>>> >> >>> >>>>
>>>>> >> >>> >>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <kc...@google.com> wrote:
>>>>> >> >>> >>>>>
>>>>> >> >>> >>>>> Hi again everyone, thanks for all of your feedback. I've almost completely rewritten the document to include another design option and address more possibilities and challenges. Please take a look.
>>>>> >> >>> >>>>>
>>>>> >> >>> >>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>>> >> >>> >>>>>>
>>>>> >> >>> >>>>>> Very happy to see we will have pushdown optimizations for java pipelines! Thanks for sharing the proposal.
>>>>> >> >>> >>>>>>
>>>>> >> >>> >>>>>> Thanks,
>>>>> >> >>> >>>>>> XInyu
>>>>> >> >>> >>>>>>
>>>>> >> >>> >>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <ar...@gmail.com> wrote:
>>>>> >> >>> >>>>>>>
>>>>> >> >>> >>>>>>> Thanks Kyle, very promising. I left some comments.
>>>>> >> >>> >>>>>>>
>>>>> >> >>> >>>>>>> —
>>>>> >> >>> >>>>>>> Alexey
>>>>> >> >>> >>>>>>>
>>>>> >> >>> >>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com> wrote:
>>>>> >> >>> >>>>>>>
>>>>> >> >>> >>>>>>> Thanks, I took a look at it.
>>>>> >> >>> >>>>>>>
>>>>> >> >>> >>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <kc...@google.com> wrote:
>>>>> >> >>> >>>>>>>>
>>>>> >> >>> >>>>>>>> Hi Beam devs,
>>>>> >> >>> >>>>>>>>
>>>>> >> >>> >>>>>>>> I'm back with another proposal involving Schema-based optimization in the Beam Java SDK. This one builds on the ideas in my previous proposal and is broader in scope. Please leave comments if this area is of interest to you.
>>>>> >> >>> >>>>>>>>
>>>>> >> >>> >>>>>>>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>>>>> >> >>> >>>>>>>>
>>>>> >> >>> >>>>>>>> Thank you,
>>>>> >> >>> >>>>>>>> Kyle
>>>>> >> >>> >>>>>>>
>>>>> >> >>> >>>>>>>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Kyle Weaver <kc...@google.com>.
>
> My primary concern with Option 1 has been that it does not seem a
> stepping stone for anything portable/multi-language, and as we'll
> definitely want to support that (IMHO sooner rather than later) I
> think whatever we build should go in that direction.


We can resolve this by designing a common set of interfaces that can be
used by either option 1 or 3. There would need to be an interface for
producers and another for consumers of projections. For producers, the
PushdownProjector [1] interface I previously added is limited to
PTransforms, but as Brian suggested, we should make a generic interface
so we can implement it on DoFns, etc. as well. For consumers, we would
similarly be able to implement the interface on both transforms and DoFns.
For DoFns, we could implement the new interface using the existing
FieldAccess descriptor annotation. I uploaded a draft PR to show what I
mean [2]. We could use these interfaces directly for option 1, and
translate them into PTransform annotations for option 3.

Once we have this common set of interfaces, everything else is just an
"implementation detail." Options 1 and 3 are then not contradictory.
They're both useful, and there's no reason we couldn't easily implement
both of them. This way we can cover both the legacy Java and
portable/cross-language use cases.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/PushdownProjector.java
[2] https://github.com/apache/beam/pull/15599

On Mon, Sep 27, 2021 at 10:04 AM Andrew Pilloud <ap...@google.com> wrote:

> Luke's suggestion matches what is currently in SQL. An IO is given a
> filter in a canonical format (an array of ANDs) and it returns back a
> subset of the filter it doesn't support. An initial implementation could
> provide the IO with the filter and let it run whatever it can without
> returning the supported set. For many cases there is little cost to running
> the filter multiple times, the primary reason for returning the unsupported
> subset is actually for additional projection pushdown.
>
> On Mon, Sep 20, 2021 at 4:19 PM Luke Cwik <lc...@google.com> wrote:
>
>> Instead of having the source state its capabilities of what it supports,
>> it might be easier to say what v1 of pushdown is and then have sources be
>> able convert that spec into something it supports, possibly doing
>> simplifications over the expression.
>>
>> For example, the spec could be FILTER with AND and OR. Then a specific
>> implementation could choose to not support OR and detect that OR is being
>> used and decide to perform the filtering after the read has been done while
>> if it got a filter with only ANDs then it could push it down. As long as we
>> provide a generic library to convert any filter expression into a function
>> that can be used to perform the filtering after the read has been done it
>> would be pretty easy for IOs to adopt support for whatever they can
>> actually do.
>>
>> On Mon, Sep 20, 2021 at 4:10 PM Brian Hulette <bh...@google.com>
>> wrote:
>>
>>> The other drawback of Option 2 is that we need a portable way to encode
>>> information about projections supported by a source, i.e.:
>>>
>>>   // Describes which projections are supported by this ParDo.
>>>   message ProjectSupport {
>>>     // Whether projection is supported at all. If false, no projections
>>> are supported.
>>>     bool supports_projection = 1;
>>>     // Whether a projection that outputs fields in a different order
>>> than its input is supported.
>>>     bool supports_field_reordering = 2;
>>>   }
>>>
>>> This looks reasonable for projection but I don't think it's feasible
>>> looking forward to predicate pushdown, where there can be much
>>> more variability in what a source supports.
>>>
>>>
>>> On Mon, Sep 20, 2021 at 3:35 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Mon, Sep 20, 2021 at 3:26 PM Luke Cwik <lc...@google.com> wrote:
>>>> >
>>>> > We could make SDKs that support projection/pushdown/... be required
>>>> to expose an expansion like API as part of the SDK container. This way we
>>>> don't have to worry about the original expansion service that may have
>>>> produced the transforms in that case.
>>>>
>>>> Yes, as mentioned this would mean attaching environments to composites
>>>> as well.
>>>>
>>>> > As for "d) some SDK-agnostic code for applying the pushdown", can we
>>>> not have a runner say "filter on field X and Y" in an agnostic way as part
>>>> of the ProcessBundleDescriptor and then have the SDK apply the optimization
>>>> using SDK specific code? This way the SDK would be responsible for
>>>> converting the generic representation into the IO specific representation.
>>>>
>>>> That is precisely what my implementation does. The only missing bit is
>>>> the ability to provide alternative expansions for composite operations
>>>> (pre-fusion and other optimizations, of course). But given the
>>>> inventory of sources, I'm not convinced that's needed. (As we've
>>>> discussed, a runner-independent way of providing multiple alternative
>>>> expansions for a composite could be provided anyway.)
>>>
>>>
>>>> > On Thu, Sep 16, 2021 at 5:01 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>> >>
>>>> >>  On Thu, Sep 16, 2021 at 3:45 PM Kyle Weaver <kc...@google.com>
>>>> wrote:
>>>> >> >>
>>>> >> >> I think this
>>>> >> >> came up earlier, but our discussion could be informed by concrete
>>>> >> >> examples of 4-5 sources that support pushdown and provide
>>>> sufficient
>>>> >> >> diversity.
>>>> >> >
>>>> >> > I looked at the four IOs that support pushdown in BeamSqlTable:
>>>> BigQuery, BigTable, MongoDB, and Parquet. Of the four, only ParquetIO
>>>> expands into a simple ParDo(DoFn).
>>>> >>
>>>> >> Interesting. In Python ParquetIO is the complicated one that expands
>>>> >> into a Read.from(BoundedSource) followed by a DoFn.
>>>> >>
>>>> >> > The other three all expand into Read.from(BoundedSource).
>>>> >>
>>>> >> This is the nice, trivial case where there is no composite structure
>>>> >> to worry about. Do we have any examples where we'd need additional
>>>> >> complexity to be able to expand composites differently?
>>>> >>
>>>> >> > This introduces yet another layer of indirection. BoundedSource is
>>>> where we must receive the pushdown request, but the
>>>> BoundedSourceAsSDFWrapperFn is the actual DoFn, so we would need to pass
>>>> pushdown information from BoundedSourceAsSDFWrapperFn to BoundedSource.
>>>> >>
>>>> >> I don't think this extra level of indirection is a problem. It's what
>>>> >> one would expect of a shim whose purpose is to wrap one API in
>>>> >> another, and should be quite straightforward.
>>>> >>
>>>> >> > This is a symptom of a larger problem: option 3 requires plumbing
>>>> pushdown information through numerous layers of the Beam stack, from the
>>>> protos to the SDK worker to the SDK’s DoFn implementation, and finally all
>>>> the way through each IO implementation. In contrast, option 1 is entirely
>>>> self-contained. It requires one method for pipeline optimization, and one
>>>> trivial withProjectionPushdown method per IO.
>>>> >>
>>>> >> It's true that an option scoped to a single language/SDK can be more
>>>> >> self-contained, but the whole point is that we don't want to be so
>>>> >> self-contained. Anything that operates at a language-agnostic level
>>>> >> requires plumbing between the user code and the protos, but this is
>>>> >> O(1) work and you're done (<200 lines for Python) and are back to
>>>> just
>>>> >> writing a projection method per IO (or, at most, per step in the IO,
>>>> >> though it looks like that's usually 1 per the four sources mentioned
>>>> >> above).
>>>> >>
>>>> >>
>>>> >> > Whether option 1 is a stepping stone to the
>>>> portable/cross-language solution remains to be seen; it depends on the
>>>> portable/cross-language solution we end up choosing. But it's the simplest
>>>> solution to the problem I set out to solve.
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> > On Thu, Sep 16, 2021 at 12:59 PM Brian Hulette <
>>>> bhulette@google.com> wrote:
>>>> >> >>>
>>>> >> >>> This would be a little bit like the environment
>>>> >> >>> (though it would also get attached to composites); maybe a
>>>> reference
>>>> >> >>> to docker image that could get started up and communicated with?
>>>> >> >>
>>>> >> >>
>>>> >> >> I was thinking along similar lines. The idea I had was that
>>>> composites from the non-submitting SDK could be annotated with the
>>>> expansion service that they came from, and we'd have a "loopback" service
>>>> for the submitting SDK transforms. But that would require that the
>>>> expansion services are still running, or we include enough information to
>>>> start them up again.
>>>> >> >>
>>>> >> >>> Note that this does not obviate the need to come up with some
>>>> >> >>> language-agnostic way of defining the projections/predicates one
>>>> wants
>>>> >> >>> to perform, but it does let one indicate whether one supports a
>>>> given
>>>> >> >>> projection in code rather than having to come up with a language
>>>> for
>>>> >> >>> that as well. The primary advantage is that it allows one to act
>>>> on
>>>> >> >>> composites rather than at the level of primitives which is nice.
>>>> >> >>> Whether this is necessary depends on how often full expansions
>>>> (rather
>>>> >> >>> than just the primitives) differ when a projection/predicate is
>>>> pushed
>>>> >> >>> down, and I have yet to come up with an example of that. (I
>>>> think this
>>>> >> >>> came up earlier, but our discussion could be informed by concrete
>>>> >> >>> examples of 4-5 sources that support pushdown and provide
>>>> sufficient
>>>> >> >>> diversity.)
>>>> >> >>
>>>> >> >>
>>>> >> >> Thanks, this articulates the advantages well. I don't think we
>>>> can avoid having a language-agnostic way to represent
>>>> projections/predicates (there may be something open source we can leverage
>>>> or at least model here, e.g. I'm keeping an eye on Substrait [1,2]).
>>>> >> >> I think you're probably right that it's rare a source with a
>>>> pushdown applied will need a different set of primitives. So the biggest
>>>> advantage of Option 4 is that it lets you articulate supported pushdowns in
>>>> code. If we create an entirely new "pushdown" service, maybe it could just
>>>> handle that instead of doing the full expansion.
>>>> >> >>
>>>> >> >> [1] https://substrait.io/
>>>> >> >> [2]
>>>> https://lists.apache.org/thread.html/r6112e3c28cd6759867c3bc37b546c0d8183f9431c6dbe234d3af018f%40%3Cdev.calcite.apache.org%3E
>>>> >> >>
>>>> >> >> On Wed, Sep 15, 2021 at 6:10 PM Robert Bradshaw <
>>>> robertwb@google.com> wrote:
>>>> >> >>>
>>>> >> >>> The re-expansion idea is an interesting one. Let's call this
>>>> Option 4.
>>>> >> >>> The main question is how it would be invoked.
>>>> >> >>>
>>>> >> >>> As well as adding complexity, I don't think it would work to do
>>>> it as
>>>> >> >>> part of the SDK's submission to the runner. For one thing, it'd
>>>> be
>>>> >> >>> best if we could just fire-and-forget, even writing the spec to
>>>> disk
>>>> >> >>> and then having the runner pick it up later, and we'd have to
>>>> figure
>>>> >> >>> out how to make this work with interactive and similar services
>>>> that
>>>> >> >>> act as runners but are really proxies for other runners. The
>>>> other
>>>> >> >>> problem is that the submitting SDK may not be the SDK that
>>>> generated
>>>> >> >>> the transform.
>>>> >> >>>
>>>> >> >>> The other option would be to have some kind of a "pushdown
>>>> service"
>>>> >> >>> that would be referenced on the transform and could be invoked
>>>> to do
>>>> >> >>> the re-expansion. This would be a little bit like the environment
>>>> >> >>> (though it would also get attached to composites); maybe a
>>>> reference
>>>> >> >>> to docker image that could get started up and communicated with?
>>>> >> >>> (Maybe the SDK image itself could be used with an alternative
>>>> >> >>> entrypoint? Assuming docker-on-docker works well...) Might get a
>>>> bit
>>>> >> >>> heavy-weight, but maybe we make it optional (though I'd rather
>>>> it be
>>>> >> >>> something you get by default).
>>>> >> >>>
>>>> >> >>> Definitely worth expanding more.
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>> My primary concern with Option 1 has been that it does not seem a
>>>> >> >>> stepping stone for anything portable/multi-language, and as we'll
>>>> >> >>> definitely want to support that (IMHO sooner rather than later) I
>>>> >> >>> think whatever we build should go in that direction.
>>>> >> >>>
>>>> >> >>> Note that this does not obviate the need to come up with some
>>>> >> >>> language-agnostic way of defining the projections/predicates one
>>>> wants
>>>> >> >>> to perform, but it does let one indicate whether one supports a
>>>> given
>>>> >> >>> projection in code rather than having to come up with a language
>>>> for
>>>> >> >>> that as well. The primary advantage is that it allows one to act
>>>> on
>>>> >> >>> composites rather than at the level of primitives which is nice.
>>>> >> >>> Whether this is necessary depends on how often full expansions
>>>> (rather
>>>> >> >>> than just the primitives) differ when a projection/predicate is
>>>> pushed
>>>> >> >>> down, and I have yet to come up with an example of that. (I
>>>> think this
>>>> >> >>> came up earlier, but our discussion could be informed by concrete
>>>> >> >>> examples of 4-5 sources that support pushdown and provide
>>>> sufficient
>>>> >> >>> diversity.)
>>>> >> >>>
>>>> >> >>> I do think Option 3 could be a stepping stone to Option 4,
>>>> viewing it
>>>> >> >>> as a mechanism for "simple" pushdowns to be done inline rather
>>>> than a
>>>> >> >>> via remote execution.
>>>> >> >>>
>>>> >> >>> - Robert
>>>> >> >>>
>>>> >> >>>
>>>> >> >>> On Wed, Sep 15, 2021 at 1:59 PM Kyle Weaver <kc...@google.com>
>>>> wrote:
>>>> >> >>> >
>>>> >> >>> > Oops, I accidentally hit the send button...
>>>> >> >>> >
>>>> >> >>> > As I was saying, I think that's an excellent idea. I discussed
>>>> this with Andrew previously and we came to a similar conclusion, that
>>>> re-expansion is more flexible than annotations, especially when it comes to
>>>> predicate pushdown.
>>>> >> >>> >
>>>> >> >>> > Robert made a prototype [1] of Option 2 in the Python SDK that
>>>> should answer some of your questions about that option. Specifically, for
>>>> your requirement d), the SDK worker could apply the pushdown before
>>>> processing. Since SDK workers are shared across runners, that would mean
>>>> the individual runners themselves would not have to re-implement pushdown.
>>>> This approach has other drawbacks, though, which I have enumerated in my
>>>> design doc.
>>>> >> >>> >
>>>> >> >>> > [1] https://github.com/apache/beam/pull/15351
>>>> >> >>> >
>>>> >> >>> > On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <
>>>> kcweaver@google.com> wrote:
>>>> >> >>> >>
>>>> >> >>> >> Hi Brian. I think that's an excele
>>>> >> >>> >>
>>>> >> >>> >> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <
>>>> bhulette@google.com> wrote:
>>>> >> >>> >>>
>>>> >> >>> >>> Thanks for the proposal Kyle! I wanted to write up some
>>>> thoughts around the portable proposal (Option 2), and it grew well beyond
>>>> the size of a doc comment.
>>>> >> >>> >>>
>>>> >> >>> >>> The basic requirements for this work, which are satisfied by
>>>> Option 1, are:
>>>> >> >>> >>> - to allow pushdown into *Java SDK* sources
>>>> >> >>> >>> - based on field accesses in *Java SDK* transforms
>>>> >> >>> >>> - when building a pipeline with the *Java SDK*.
>>>> >> >>> >>>
>>>> >> >>> >>> However down the road I would really like to see all of
>>>> those requirements go from *Java SDK* to *any SDK*. It doesn't need to
>>>> happen right away, but it would be great to avoid going down a path that's
>>>> not amenable to such an approach in the future. So it's worth thinking
>>>> about what the "any SDK" approach will look like now. (Presumably others
>>>> feel this way as well, thus Option 2).
>>>> >> >>> >>>
>>>> >> >>> >>> Thinking about a portable version, we can break down these
>>>> requirements further:
>>>> >> >>> >>> - To support pushdown into *any SDK* sources we need
>>>> >> >>> >>>   a) a portable way to define (or discover) what projections
>>>> a source is capable of
>>>> >> >>> >>>   b) a portable way to modify the source to perform those
>>>> projections
>>>> >> >>> >>> - To support tracking field accesses in *any SDK* transforms
>>>> we need
>>>> >> >>> >>>   c) a portable way to annotate field accesses (this isn't
>>>> so bad, we know how to annotate transforms, but the devil is in the details
>>>> I suppose).
>>>> >> >>> >>> - To support pushdown when building a pipeline with *any
>>>> SDK* we need
>>>> >> >>> >>>   d) some SDK-agnostic code for applying the pushdown
>>>> >> >>> >>>
>>>> >> >>> >>> I think (a,b,c) are solvable, but they depend on (d), which
>>>> is a big unknown (to me). It looks like Option 2 pursues putting this
>>>> pushdown code in runners, and having them operate on the Runner API protos
>>>> directly. I have a few concerns with this:
>>>> >> >>> >>> - It precludes having complex logic for (a,b) in the SDKs.
>>>> This was touched on in your document already [1]. I don't think it's that
>>>> problematic for projection, but as we look forward to predicate pushdown I
>>>> think we'll need to allow complex logic in the SDK for discovering
>>>> supported predicates (e.g. see the logic for predicate discovery in
>>>> BigQuery [2]).
>>>> >> >>> >>> - Will individual runners be responsible for implementing
>>>> this? We could put the logic in runners-core-construction-java to re-use it
>>>> across JVM-based runners, but what about others? Does this just move the
>>>> code duplication from the SDK side to the runner side?
>>>> >> >>> >>> - Finally, Option 1 is not really a stepping stone to this
>>>> approach.
>>>> >> >>> >>>
>>>> >> >>> >>> An alternative portable approach might be to have an
>>>> intermediate cross-SDK and cross-runner "optimizer", which (optionally)
>>>> runs between the SDK and the runner. It could send expansion requests back
>>>> to SDKs to "re-expand" sources with projections (and later predicates)
>>>> applied. This would address all of the above:
>>>> >> >>> >>> - Since we make expansion requests to apply projections, the
>>>> SDKs can implement complex logic for (a,b).
>>>> >> >>> >>> - The pushdown code isn't in a runner or an SDK, we'd only
>>>> need one optimizer implementation.
>>>> >> >>> >>> - Option 1 could be a stepping stone to this approach. The
>>>> optimizer could be implemented with code from Option 1, if it's written
>>>> with this in mind.
>>>> >> >>> >>>
>>>> >> >>> >>> This does have some downsides:
>>>> >> >>> >>> - Sources are expanded twice. IO authors will need to avoid
>>>> side effects in expansion methods, and any expensive logic will be executed
>>>> twice.
>>>> >> >>> >>> - It adds a Java dependency for non-Java SDKs (It is cleanly
>>>> separable though: if you don't want pushdown you don't need Java).
>>>> >> >>> >>>
>>>> >> >>> >>> I'm curious what others think about this approach. Is it
>>>> tenable? Am I missing something?
>>>> >> >>> >>>
>>>> >> >>> >>> Thanks!
>>>> >> >>> >>> Brian
>>>> >> >>> >>>
>>>> >> >>> >>> [1]
>>>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
>>>> >> >>> >>> [2]
>>>> https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
>>>> >> >>> >>>
>>>> >> >>> >>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com>
>>>> wrote:
>>>> >> >>> >>>>
>>>> >> >>> >>>> Thanks, I made a pass over Option 2
>>>> >> >>> >>>>
>>>> >> >>> >>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <
>>>> kcweaver@google.com> wrote:
>>>> >> >>> >>>>>
>>>> >> >>> >>>>> Hi again everyone, thanks for all of your feedback. I've
>>>> almost completely rewritten the document to include another design option
>>>> and address more possibilities and challenges. Please take a look.
>>>> >> >>> >>>>>
>>>> >> >>> >>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <
>>>> xinyuliu.us@gmail.com> wrote:
>>>> >> >>> >>>>>>
>>>> >> >>> >>>>>> Very happy to see we will have pushdown optimizations for
>>>> java pipelines! Thanks for sharing the proposal.
>>>> >> >>> >>>>>>
>>>> >> >>> >>>>>> Thanks,
>>>> >> >>> >>>>>> XInyu
>>>> >> >>> >>>>>>
>>>> >> >>> >>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>> >> >>> >>>>>>>
>>>> >> >>> >>>>>>> Thanks Kyle, very promising. I left some comments.
>>>> >> >>> >>>>>>>
>>>> >> >>> >>>>>>> —
>>>> >> >>> >>>>>>> Alexey
>>>> >> >>> >>>>>>>
>>>> >> >>> >>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com>
>>>> wrote:
>>>> >> >>> >>>>>>>
>>>> >> >>> >>>>>>> Thanks, I took a look at it.
>>>> >> >>> >>>>>>>
>>>> >> >>> >>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <
>>>> kcweaver@google.com> wrote:
>>>> >> >>> >>>>>>>>
>>>> >> >>> >>>>>>>> Hi Beam devs,
>>>> >> >>> >>>>>>>>
>>>> >> >>> >>>>>>>> I'm back with another proposal involving Schema-based
>>>> optimization in the Beam Java SDK. This one builds on the ideas in my
>>>> previous proposal and is broader in scope. Please leave comments if this
>>>> area is of interest to you.
>>>> >> >>> >>>>>>>>
>>>> >> >>> >>>>>>>>
>>>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>>>> >> >>> >>>>>>>>
>>>> >> >>> >>>>>>>> Thank you,
>>>> >> >>> >>>>>>>> Kyle
>>>> >> >>> >>>>>>>
>>>> >> >>> >>>>>>>
>>>>
>>>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Andrew Pilloud <ap...@google.com>.
Luke's suggestion matches what is currently in SQL. An IO is given a filter
in a canonical format (an array of ANDs) and it returns back a subset of
the filter it doesn't support. An initial implementation could provide the
IO with the filter and let it run whatever it can without returning the
supported set. For many cases there is little cost to running the filter
multiple times, the primary reason for returning the unsupported subset is
actually for additional projection pushdown.

On Mon, Sep 20, 2021 at 4:19 PM Luke Cwik <lc...@google.com> wrote:

> Instead of having the source state its capabilities of what it supports,
> it might be easier to say what v1 of pushdown is and then have sources be
> able convert that spec into something it supports, possibly doing
> simplifications over the expression.
>
> For example, the spec could be FILTER with AND and OR. Then a specific
> implementation could choose to not support OR and detect that OR is being
> used and decide to perform the filtering after the read has been done while
> if it got a filter with only ANDs then it could push it down. As long as we
> provide a generic library to convert any filter expression into a function
> that can be used to perform the filtering after the read has been done it
> would be pretty easy for IOs to adopt support for whatever they can
> actually do.
>
> On Mon, Sep 20, 2021 at 4:10 PM Brian Hulette <bh...@google.com> wrote:
>
>> The other drawback of Option 2 is that we need a portable way to encode
>> information about projections supported by a source, i.e.:
>>
>>   // Describes which projections are supported by this ParDo.
>>   message ProjectSupport {
>>     // Whether projection is supported at all. If false, no projections
>> are supported.
>>     bool supports_projection = 1;
>>     // Whether a projection that outputs fields in a different order than
>> its input is supported.
>>     bool supports_field_reordering = 2;
>>   }
>>
>> This looks reasonable for projection but I don't think it's feasible
>> looking forward to predicate pushdown, where there can be much
>> more variability in what a source supports.
>>
>>
>> On Mon, Sep 20, 2021 at 3:35 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Mon, Sep 20, 2021 at 3:26 PM Luke Cwik <lc...@google.com> wrote:
>>> >
>>> > We could make SDKs that support projection/pushdown/... be required to
>>> expose an expansion like API as part of the SDK container. This way we
>>> don't have to worry about the original expansion service that may have
>>> produced the transforms in that case.
>>>
>>> Yes, as mentioned this would mean attaching environments to composites
>>> as well.
>>>
>>> > As for "d) some SDK-agnostic code for applying the pushdown", can we
>>> not have a runner say "filter on field X and Y" in an agnostic way as part
>>> of the ProcessBundleDescriptor and then have the SDK apply the optimization
>>> using SDK specific code? This way the SDK would be responsible for
>>> converting the generic representation into the IO specific representation.
>>>
>>> That is precisely what my implementation does. The only missing bit is
>>> the ability to provide alternative expansions for composite operations
>>> (pre-fusion and other optimizations, of course). But given the
>>> inventory of sources, I'm not convinced that's needed. (As we've
>>> discussed, a runner-independent way of providing multiple alternative
>>> expansions for a composite could be provided anyway.)
>>
>>
>>> > On Thu, Sep 16, 2021 at 5:01 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>
>>> >>  On Thu, Sep 16, 2021 at 3:45 PM Kyle Weaver <kc...@google.com>
>>> wrote:
>>> >> >>
>>> >> >> I think this
>>> >> >> came up earlier, but our discussion could be informed by concrete
>>> >> >> examples of 4-5 sources that support pushdown and provide
>>> sufficient
>>> >> >> diversity.
>>> >> >
>>> >> > I looked at the four IOs that support pushdown in BeamSqlTable:
>>> BigQuery, BigTable, MongoDB, and Parquet. Of the four, only ParquetIO
>>> expands into a simple ParDo(DoFn).
>>> >>
>>> >> Interesting. In Python ParquetIO is the complicated one that expands
>>> >> into a Read.from(BoundedSource) followed by a DoFn.
>>> >>
>>> >> > The other three all expand into Read.from(BoundedSource).
>>> >>
>>> >> This is the nice, trivial case where there is no composite structure
>>> >> to worry about. Do we have any examples where we'd need additional
>>> >> complexity to be able to expand composites differently?
>>> >>
>>> >> > This introduces yet another layer of indirection. BoundedSource is
>>> where we must receive the pushdown request, but the
>>> BoundedSourceAsSDFWrapperFn is the actual DoFn, so we would need to pass
>>> pushdown information from BoundedSourceAsSDFWrapperFn to BoundedSource.
>>> >>
>>> >> I don't think this extra level of indirection is a problem. It's what
>>> >> one would expect of a shim whose purpose is to wrap one API in
>>> >> another, and should be quite straightforward.
>>> >>
>>> >> > This is a symptom of a larger problem: option 3 requires plumbing
>>> pushdown information through numerous layers of the Beam stack, from the
>>> protos to the SDK worker to the SDK’s DoFn implementation, and finally all
>>> the way through each IO implementation. In contrast, option 1 is entirely
>>> self-contained. It requires one method for pipeline optimization, and one
>>> trivial withProjectionPushdown method per IO.
>>> >>
>>> >> It's true that an option scoped to a single language/SDK can be more
>>> >> self-contained, but the whole point is that we don't want to be so
>>> >> self-contained. Anything that operates at a language-agnostic level
>>> >> requires plumbing between the user code and the protos, but this is
>>> >> O(1) work and you're done (<200 lines for Python) and are back to just
>>> >> writing a projection method per IO (or, at most, per step in the IO,
>>> >> though it looks like that's usually 1 per the four sources mentioned
>>> >> above).
>>> >>
>>> >>
>>> >> > Whether option 1 is a stepping stone to the portable/cross-language
>>> solution remains to be seen; it depends on the portable/cross-language
>>> solution we end up choosing. But it's the simplest solution to the problem
>>> I set out to solve.
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Thu, Sep 16, 2021 at 12:59 PM Brian Hulette <bh...@google.com>
>>> wrote:
>>> >> >>>
>>> >> >>> This would be a little bit like the environment
>>> >> >>> (though it would also get attached to composites); maybe a
>>> reference
>>> >> >>> to docker image that could get started up and communicated with?
>>> >> >>
>>> >> >>
>>> >> >> I was thinking along similar lines. The idea I had was that
>>> composites from the non-submitting SDK could be annotated with the
>>> expansion service that they came from, and we'd have a "loopback" service
>>> for the submitting SDK transforms. But that would require that the
>>> expansion services are still running, or we include enough information to
>>> start them up again.
>>> >> >>
>>> >> >>> Note that this does not obviate the need to come up with some
>>> >> >>> language-agnostic way of defining the projections/predicates one
>>> wants
>>> >> >>> to perform, but it does let one indicate whether one supports a
>>> given
>>> >> >>> projection in code rather than having to come up with a language
>>> for
>>> >> >>> that as well. The primary advantage is that it allows one to act
>>> on
>>> >> >>> composites rather than at the level of primitives which is nice.
>>> >> >>> Whether this is necessary depends on how often full expansions
>>> (rather
>>> >> >>> than just the primitives) differ when a projection/predicate is
>>> pushed
>>> >> >>> down, and I have yet to come up with an example of that. (I think
>>> this
>>> >> >>> came up earlier, but our discussion could be informed by concrete
>>> >> >>> examples of 4-5 sources that support pushdown and provide
>>> sufficient
>>> >> >>> diversity.)
>>> >> >>
>>> >> >>
>>> >> >> Thanks, this articulates the advantages well. I don't think we can
>>> avoid having a language-agnostic way to represent projections/predicates
>>> (there may be something open source we can leverage or at least model here,
>>> e.g. I'm keeping an eye on Substrait [1,2]).
>>> >> >> I think you're probably right that it's rare a source with a
>>> pushdown applied will need a different set of primitives. So the biggest
>>> advantage of Option 4 is that it lets you articulate supported pushdowns in
>>> code. If we create an entirely new "pushdown" service, maybe it could just
>>> handle that instead of doing the full expansion.
>>> >> >>
>>> >> >> [1] https://substrait.io/
>>> >> >> [2]
>>> https://lists.apache.org/thread.html/r6112e3c28cd6759867c3bc37b546c0d8183f9431c6dbe234d3af018f%40%3Cdev.calcite.apache.org%3E
>>> >> >>
>>> >> >> On Wed, Sep 15, 2021 at 6:10 PM Robert Bradshaw <
>>> robertwb@google.com> wrote:
>>> >> >>>
>>> >> >>> The re-expansion idea is an interesting one. Let's call this
>>> Option 4.
>>> >> >>> The main question is how it would be invoked.
>>> >> >>>
>>> >> >>> As well as adding complexity, I don't think it would work to do
>>> it as
>>> >> >>> part of the SDK's submission to the runner. For one thing, it'd be
>>> >> >>> best if we could just fire-and-forget, even writing the spec to
>>> disk
>>> >> >>> and then having the runner pick it up later, and we'd have to
>>> figure
>>> >> >>> out how to make this work with interactive and similar services
>>> that
>>> >> >>> act as runners but are really proxies for other runners. The other
>>> >> >>> problem is that the submitting SDK may not be the SDK that
>>> generated
>>> >> >>> the transform.
>>> >> >>>
>>> >> >>> The other option would be to have some kind of a "pushdown
>>> service"
>>> >> >>> that would be referenced on the transform and could be invoked to
>>> do
>>> >> >>> the re-expansion. This would be a little bit like the environment
>>> >> >>> (though it would also get attached to composites); maybe a
>>> reference
>>> >> >>> to docker image that could get started up and communicated with?
>>> >> >>> (Maybe the SDK image itself could be used with an alternative
>>> >> >>> entrypoint? Assuming docker-on-docker works well...) Might get a
>>> bit
>>> >> >>> heavy-weight, but maybe we make it optional (though I'd rather it
>>> be
>>> >> >>> something you get by default).
>>> >> >>>
>>> >> >>> Definitely worth expanding more.
>>> >> >>>
>>> >> >>>
>>> >> >>>
>>> >> >>> My primary concern with Option 1 has been that it does not seem a
>>> >> >>> stepping stone for anything portable/multi-language, and as we'll
>>> >> >>> definitely want to support that (IMHO sooner rather than later) I
>>> >> >>> think whatever we build should go in that direction.
>>> >> >>>
>>> >> >>> Note that this does not obviate the need to come up with some
>>> >> >>> language-agnostic way of defining the projections/predicates one
>>> wants
>>> >> >>> to perform, but it does let one indicate whether one supports a
>>> given
>>> >> >>> projection in code rather than having to come up with a language
>>> for
>>> >> >>> that as well. The primary advantage is that it allows one to act
>>> on
>>> >> >>> composites rather than at the level of primitives which is nice.
>>> >> >>> Whether this is necessary depends on how often full expansions
>>> (rather
>>> >> >>> than just the primitives) differ when a projection/predicate is
>>> pushed
>>> >> >>> down, and I have yet to come up with an example of that. (I think
>>> this
>>> >> >>> came up earlier, but our discussion could be informed by concrete
>>> >> >>> examples of 4-5 sources that support pushdown and provide
>>> sufficient
>>> >> >>> diversity.)
>>> >> >>>
>>> >> >>> I do think Option 3 could be a stepping stone to Option 4,
>>> viewing it
>>> >> >>> as a mechanism for "simple" pushdowns to be done inline rather
>>> than a
>>> >> >>> via remote execution.
>>> >> >>>
>>> >> >>> - Robert
>>> >> >>>
>>> >> >>>
>>> >> >>> On Wed, Sep 15, 2021 at 1:59 PM Kyle Weaver <kc...@google.com>
>>> wrote:
>>> >> >>> >
>>> >> >>> > Oops, I accidentally hit the send button...
>>> >> >>> >
>>> >> >>> > As I was saying, I think that's an excellent idea. I discussed
>>> this with Andrew previously and we came to a similar conclusion, that
>>> re-expansion is more flexible than annotations, especially when it comes to
>>> predicate pushdown.
>>> >> >>> >
>>> >> >>> > Robert made a prototype [1] of Option 2 in the Python SDK that
>>> should answer some of your questions about that option. Specifically, for
>>> your requirement d), the SDK worker could apply the pushdown before
>>> processing. Since SDK workers are shared across runners, that would mean
>>> the individual runners themselves would not have to re-implement pushdown.
>>> This approach has other drawbacks, though, which I have enumerated in my
>>> design doc.
>>> >> >>> >
>>> >> >>> > [1] https://github.com/apache/beam/pull/15351
>>> >> >>> >
>>> >> >>> > On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <
>>> kcweaver@google.com> wrote:
>>> >> >>> >>
>>> >> >>> >> Hi Brian. I think that's an excele
>>> >> >>> >>
>>> >> >>> >> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <
>>> bhulette@google.com> wrote:
>>> >> >>> >>>
>>> >> >>> >>> Thanks for the proposal Kyle! I wanted to write up some
>>> thoughts around the portable proposal (Option 2), and it grew well beyond
>>> the size of a doc comment.
>>> >> >>> >>>
>>> >> >>> >>> The basic requirements for this work, which are satisfied by
>>> Option 1, are:
>>> >> >>> >>> - to allow pushdown into *Java SDK* sources
>>> >> >>> >>> - based on field accesses in *Java SDK* transforms
>>> >> >>> >>> - when building a pipeline with the *Java SDK*.
>>> >> >>> >>>
>>> >> >>> >>> However down the road I would really like to see all of those
>>> requirements go from *Java SDK* to *any SDK*. It doesn't need to happen
>>> right away, but it would be great to avoid going down a path that's not
>>> amenable to such an approach in the future. So it's worth thinking about
>>> what the "any SDK" approach will look like now. (Presumably others feel
>>> this way as well, thus Option 2).
>>> >> >>> >>>
>>> >> >>> >>> Thinking about a portable version, we can break down these
>>> requirements further:
>>> >> >>> >>> - To support pushdown into *any SDK* sources we need
>>> >> >>> >>>   a) a portable way to define (or discover) what projections
>>> a source is capable of
>>> >> >>> >>>   b) a portable way to modify the source to perform those
>>> projections
>>> >> >>> >>> - To support tracking field accesses in *any SDK* transforms
>>> we need
>>> >> >>> >>>   c) a portable way to annotate field accesses (this isn't so
>>> bad, we know how to annotate transforms, but the devil is in the details I
>>> suppose).
>>> >> >>> >>> - To support pushdown when building a pipeline with *any SDK*
>>> we need
>>> >> >>> >>>   d) some SDK-agnostic code for applying the pushdown
>>> >> >>> >>>
>>> >> >>> >>> I think (a,b,c) are solvable, but they depend on (d), which
>>> is a big unknown (to me). It looks like Option 2 pursues putting this
>>> pushdown code in runners, and having them operate on the Runner API protos
>>> directly. I have a few concerns with this:
>>> >> >>> >>> - It precludes having complex logic for (a,b) in the SDKs.
>>> This was touched on in your document already [1]. I don't think it's that
>>> problematic for projection, but as we look forward to predicate pushdown I
>>> think we'll need to allow complex logic in the SDK for discovering
>>> supported predicates (e.g. see the logic for predicate discovery in
>>> BigQuery [2]).
>>> >> >>> >>> - Will individual runners be responsible for implementing
>>> this? We could put the logic in runners-core-construction-java to re-use it
>>> across JVM-based runners, but what about others? Does this just move the
>>> code duplication from the SDK side to the runner side?
>>> >> >>> >>> - Finally, Option 1 is not really a stepping stone to this
>>> approach.
>>> >> >>> >>>
>>> >> >>> >>> An alternative portable approach might be to have an
>>> intermediate cross-SDK and cross-runner "optimizer", which (optionally)
>>> runs between the SDK and the runner. It could send expansion requests back
>>> to SDKs to "re-expand" sources with projections (and later predicates)
>>> applied. This would address all of the above:
>>> >> >>> >>> - Since we make expansion requests to apply projections, the
>>> SDKs can implement complex logic for (a,b).
>>> >> >>> >>> - The pushdown code isn't in a runner or an SDK, we'd only
>>> need one optimizer implementation.
>>> >> >>> >>> - Option 1 could be a stepping stone to this approach. The
>>> optimizer could be implemented with code from Option 1, if it's written
>>> with this in mind.
>>> >> >>> >>>
>>> >> >>> >>> This does have some downsides:
>>> >> >>> >>> - Sources are expanded twice. IO authors will need to avoid
>>> side effects in expansion methods, and any expensive logic will be executed
>>> twice.
>>> >> >>> >>> - It adds a Java dependency for non-Java SDKs (It is cleanly
>>> separable though: if you don't want pushdown you don't need Java).
>>> >> >>> >>>
>>> >> >>> >>> I'm curious what others think about this approach. Is it
>>> tenable? Am I missing something?
>>> >> >>> >>>
>>> >> >>> >>> Thanks!
>>> >> >>> >>> Brian
>>> >> >>> >>>
>>> >> >>> >>> [1]
>>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
>>> >> >>> >>> [2]
>>> https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
>>> >> >>> >>>
>>> >> >>> >>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com>
>>> wrote:
>>> >> >>> >>>>
>>> >> >>> >>>> Thanks, I made a pass over Option 2
>>> >> >>> >>>>
>>> >> >>> >>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <
>>> kcweaver@google.com> wrote:
>>> >> >>> >>>>>
>>> >> >>> >>>>> Hi again everyone, thanks for all of your feedback. I've
>>> almost completely rewritten the document to include another design option
>>> and address more possibilities and challenges. Please take a look.
>>> >> >>> >>>>>
>>> >> >>> >>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <
>>> xinyuliu.us@gmail.com> wrote:
>>> >> >>> >>>>>>
>>> >> >>> >>>>>> Very happy to see we will have pushdown optimizations for
>>> java pipelines! Thanks for sharing the proposal.
>>> >> >>> >>>>>>
>>> >> >>> >>>>>> Thanks,
>>> >> >>> >>>>>> XInyu
>>> >> >>> >>>>>>
>>> >> >>> >>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>> >> >>> >>>>>>>
>>> >> >>> >>>>>>> Thanks Kyle, very promising. I left some comments.
>>> >> >>> >>>>>>>
>>> >> >>> >>>>>>> —
>>> >> >>> >>>>>>> Alexey
>>> >> >>> >>>>>>>
>>> >> >>> >>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com>
>>> wrote:
>>> >> >>> >>>>>>>
>>> >> >>> >>>>>>> Thanks, I took a look at it.
>>> >> >>> >>>>>>>
>>> >> >>> >>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <
>>> kcweaver@google.com> wrote:
>>> >> >>> >>>>>>>>
>>> >> >>> >>>>>>>> Hi Beam devs,
>>> >> >>> >>>>>>>>
>>> >> >>> >>>>>>>> I'm back with another proposal involving Schema-based
>>> optimization in the Beam Java SDK. This one builds on the ideas in my
>>> previous proposal and is broader in scope. Please leave comments if this
>>> area is of interest to you.
>>> >> >>> >>>>>>>>
>>> >> >>> >>>>>>>>
>>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>>> >> >>> >>>>>>>>
>>> >> >>> >>>>>>>> Thank you,
>>> >> >>> >>>>>>>> Kyle
>>> >> >>> >>>>>>>
>>> >> >>> >>>>>>>
>>>
>>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Luke Cwik <lc...@google.com>.
Instead of having the source state its capabilities of what it supports, it
might be easier to say what v1 of pushdown is and then have sources be able
convert that spec into something it supports, possibly doing
simplifications over the expression.

For example, the spec could be FILTER with AND and OR. Then a specific
implementation could choose to not support OR and detect that OR is being
used and decide to perform the filtering after the read has been done while
if it got a filter with only ANDs then it could push it down. As long as we
provide a generic library to convert any filter expression into a function
that can be used to perform the filtering after the read has been done it
would be pretty easy for IOs to adopt support for whatever they can
actually do.

On Mon, Sep 20, 2021 at 4:10 PM Brian Hulette <bh...@google.com> wrote:

> The other drawback of Option 2 is that we need a portable way to encode
> information about projections supported by a source, i.e.:
>
>   // Describes which projections are supported by this ParDo.
>   message ProjectSupport {
>     // Whether projection is supported at all. If false, no projections
> are supported.
>     bool supports_projection = 1;
>     // Whether a projection that outputs fields in a different order than
> its input is supported.
>     bool supports_field_reordering = 2;
>   }
>
> This looks reasonable for projection but I don't think it's feasible
> looking forward to predicate pushdown, where there can be much
> more variability in what a source supports.
>
>
> On Mon, Sep 20, 2021 at 3:35 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Mon, Sep 20, 2021 at 3:26 PM Luke Cwik <lc...@google.com> wrote:
>> >
>> > We could make SDKs that support projection/pushdown/... be required to
>> expose an expansion like API as part of the SDK container. This way we
>> don't have to worry about the original expansion service that may have
>> produced the transforms in that case.
>>
>> Yes, as mentioned this would mean attaching environments to composites as
>> well.
>>
>> > As for "d) some SDK-agnostic code for applying the pushdown", can we
>> not have a runner say "filter on field X and Y" in an agnostic way as part
>> of the ProcessBundleDescriptor and then have the SDK apply the optimization
>> using SDK specific code? This way the SDK would be responsible for
>> converting the generic representation into the IO specific representation.
>>
>> That is precisely what my implementation does. The only missing bit is
>> the ability to provide alternative expansions for composite operations
>> (pre-fusion and other optimizations, of course). But given the
>> inventory of sources, I'm not convinced that's needed. (As we've
>> discussed, a runner-independent way of providing multiple alternative
>> expansions for a composite could be provided anyway.)
>
>
>> > On Thu, Sep 16, 2021 at 5:01 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >>  On Thu, Sep 16, 2021 at 3:45 PM Kyle Weaver <kc...@google.com>
>> wrote:
>> >> >>
>> >> >> I think this
>> >> >> came up earlier, but our discussion could be informed by concrete
>> >> >> examples of 4-5 sources that support pushdown and provide sufficient
>> >> >> diversity.
>> >> >
>> >> > I looked at the four IOs that support pushdown in BeamSqlTable:
>> BigQuery, BigTable, MongoDB, and Parquet. Of the four, only ParquetIO
>> expands into a simple ParDo(DoFn).
>> >>
>> >> Interesting. In Python ParquetIO is the complicated one that expands
>> >> into a Read.from(BoundedSource) followed by a DoFn.
>> >>
>> >> > The other three all expand into Read.from(BoundedSource).
>> >>
>> >> This is the nice, trivial case where there is no composite structure
>> >> to worry about. Do we have any examples where we'd need additional
>> >> complexity to be able to expand composites differently?
>> >>
>> >> > This introduces yet another layer of indirection. BoundedSource is
>> where we must receive the pushdown request, but the
>> BoundedSourceAsSDFWrapperFn is the actual DoFn, so we would need to pass
>> pushdown information from BoundedSourceAsSDFWrapperFn to BoundedSource.
>> >>
>> >> I don't think this extra level of indirection is a problem. It's what
>> >> one would expect of a shim whose purpose is to wrap one API in
>> >> another, and should be quite straightforward.
>> >>
>> >> > This is a symptom of a larger problem: option 3 requires plumbing
>> pushdown information through numerous layers of the Beam stack, from the
>> protos to the SDK worker to the SDK’s DoFn implementation, and finally all
>> the way through each IO implementation. In contrast, option 1 is entirely
>> self-contained. It requires one method for pipeline optimization, and one
>> trivial withProjectionPushdown method per IO.
>> >>
>> >> It's true that an option scoped to a single language/SDK can be more
>> >> self-contained, but the whole point is that we don't want to be so
>> >> self-contained. Anything that operates at a language-agnostic level
>> >> requires plumbing between the user code and the protos, but this is
>> >> O(1) work and you're done (<200 lines for Python) and are back to just
>> >> writing a projection method per IO (or, at most, per step in the IO,
>> >> though it looks like that's usually 1 per the four sources mentioned
>> >> above).
>> >>
>> >>
>> >> > Whether option 1 is a stepping stone to the portable/cross-language
>> solution remains to be seen; it depends on the portable/cross-language
>> solution we end up choosing. But it's the simplest solution to the problem
>> I set out to solve.
>> >> >
>> >> >
>> >> >
>> >> > On Thu, Sep 16, 2021 at 12:59 PM Brian Hulette <bh...@google.com>
>> wrote:
>> >> >>>
>> >> >>> This would be a little bit like the environment
>> >> >>> (though it would also get attached to composites); maybe a
>> reference
>> >> >>> to docker image that could get started up and communicated with?
>> >> >>
>> >> >>
>> >> >> I was thinking along similar lines. The idea I had was that
>> composites from the non-submitting SDK could be annotated with the
>> expansion service that they came from, and we'd have a "loopback" service
>> for the submitting SDK transforms. But that would require that the
>> expansion services are still running, or we include enough information to
>> start them up again.
>> >> >>
>> >> >>> Note that this does not obviate the need to come up with some
>> >> >>> language-agnostic way of defining the projections/predicates one
>> wants
>> >> >>> to perform, but it does let one indicate whether one supports a
>> given
>> >> >>> projection in code rather than having to come up with a language
>> for
>> >> >>> that as well. The primary advantage is that it allows one to act on
>> >> >>> composites rather than at the level of primitives which is nice.
>> >> >>> Whether this is necessary depends on how often full expansions
>> (rather
>> >> >>> than just the primitives) differ when a projection/predicate is
>> pushed
>> >> >>> down, and I have yet to come up with an example of that. (I think
>> this
>> >> >>> came up earlier, but our discussion could be informed by concrete
>> >> >>> examples of 4-5 sources that support pushdown and provide
>> sufficient
>> >> >>> diversity.)
>> >> >>
>> >> >>
>> >> >> Thanks, this articulates the advantages well. I don't think we can
>> avoid having a language-agnostic way to represent projections/predicates
>> (there may be something open source we can leverage or at least model here,
>> e.g. I'm keeping an eye on Substrait [1,2]).
>> >> >> I think you're probably right that it's rare a source with a
>> pushdown applied will need a different set of primitives. So the biggest
>> advantage of Option 4 is that it lets you articulate supported pushdowns in
>> code. If we create an entirely new "pushdown" service, maybe it could just
>> handle that instead of doing the full expansion.
>> >> >>
>> >> >> [1] https://substrait.io/
>> >> >> [2]
>> https://lists.apache.org/thread.html/r6112e3c28cd6759867c3bc37b546c0d8183f9431c6dbe234d3af018f%40%3Cdev.calcite.apache.org%3E
>> >> >>
>> >> >> On Wed, Sep 15, 2021 at 6:10 PM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >> >>>
>> >> >>> The re-expansion idea is an interesting one. Let's call this
>> Option 4.
>> >> >>> The main question is how it would be invoked.
>> >> >>>
>> >> >>> As well as adding complexity, I don't think it would work to do it
>> as
>> >> >>> part of the SDK's submission to the runner. For one thing, it'd be
>> >> >>> best if we could just fire-and-forget, even writing the spec to
>> disk
>> >> >>> and then having the runner pick it up later, and we'd have to
>> figure
>> >> >>> out how to make this work with interactive and similar services
>> that
>> >> >>> act as runners but are really proxies for other runners. The other
>> >> >>> problem is that the submitting SDK may not be the SDK that
>> generated
>> >> >>> the transform.
>> >> >>>
>> >> >>> The other option would be to have some kind of a "pushdown service"
>> >> >>> that would be referenced on the transform and could be invoked to
>> do
>> >> >>> the re-expansion. This would be a little bit like the environment
>> >> >>> (though it would also get attached to composites); maybe a
>> reference
>> >> >>> to docker image that could get started up and communicated with?
>> >> >>> (Maybe the SDK image itself could be used with an alternative
>> >> >>> entrypoint? Assuming docker-on-docker works well...) Might get a
>> bit
>> >> >>> heavy-weight, but maybe we make it optional (though I'd rather it
>> be
>> >> >>> something you get by default).
>> >> >>>
>> >> >>> Definitely worth expanding more.
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> My primary concern with Option 1 has been that it does not seem a
>> >> >>> stepping stone for anything portable/multi-language, and as we'll
>> >> >>> definitely want to support that (IMHO sooner rather than later) I
>> >> >>> think whatever we build should go in that direction.
>> >> >>>
>> >> >>> Note that this does not obviate the need to come up with some
>> >> >>> language-agnostic way of defining the projections/predicates one
>> wants
>> >> >>> to perform, but it does let one indicate whether one supports a
>> given
>> >> >>> projection in code rather than having to come up with a language
>> for
>> >> >>> that as well. The primary advantage is that it allows one to act on
>> >> >>> composites rather than at the level of primitives which is nice.
>> >> >>> Whether this is necessary depends on how often full expansions
>> (rather
>> >> >>> than just the primitives) differ when a projection/predicate is
>> pushed
>> >> >>> down, and I have yet to come up with an example of that. (I think
>> this
>> >> >>> came up earlier, but our discussion could be informed by concrete
>> >> >>> examples of 4-5 sources that support pushdown and provide
>> sufficient
>> >> >>> diversity.)
>> >> >>>
>> >> >>> I do think Option 3 could be a stepping stone to Option 4, viewing
>> it
>> >> >>> as a mechanism for "simple" pushdowns to be done inline rather
>> than a
>> >> >>> via remote execution.
>> >> >>>
>> >> >>> - Robert
>> >> >>>
>> >> >>>
>> >> >>> On Wed, Sep 15, 2021 at 1:59 PM Kyle Weaver <kc...@google.com>
>> wrote:
>> >> >>> >
>> >> >>> > Oops, I accidentally hit the send button...
>> >> >>> >
>> >> >>> > As I was saying, I think that's an excellent idea. I discussed
>> this with Andrew previously and we came to a similar conclusion, that
>> re-expansion is more flexible than annotations, especially when it comes to
>> predicate pushdown.
>> >> >>> >
>> >> >>> > Robert made a prototype [1] of Option 2 in the Python SDK that
>> should answer some of your questions about that option. Specifically, for
>> your requirement d), the SDK worker could apply the pushdown before
>> processing. Since SDK workers are shared across runners, that would mean
>> the individual runners themselves would not have to re-implement pushdown.
>> This approach has other drawbacks, though, which I have enumerated in my
>> design doc.
>> >> >>> >
>> >> >>> > [1] https://github.com/apache/beam/pull/15351
>> >> >>> >
>> >> >>> > On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <kc...@google.com>
>> wrote:
>> >> >>> >>
>> >> >>> >> Hi Brian. I think that's an excele
>> >> >>> >>
>> >> >>> >> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <
>> bhulette@google.com> wrote:
>> >> >>> >>>
>> >> >>> >>> Thanks for the proposal Kyle! I wanted to write up some
>> thoughts around the portable proposal (Option 2), and it grew well beyond
>> the size of a doc comment.
>> >> >>> >>>
>> >> >>> >>> The basic requirements for this work, which are satisfied by
>> Option 1, are:
>> >> >>> >>> - to allow pushdown into *Java SDK* sources
>> >> >>> >>> - based on field accesses in *Java SDK* transforms
>> >> >>> >>> - when building a pipeline with the *Java SDK*.
>> >> >>> >>>
>> >> >>> >>> However down the road I would really like to see all of those
>> requirements go from *Java SDK* to *any SDK*. It doesn't need to happen
>> right away, but it would be great to avoid going down a path that's not
>> amenable to such an approach in the future. So it's worth thinking about
>> what the "any SDK" approach will look like now. (Presumably others feel
>> this way as well, thus Option 2).
>> >> >>> >>>
>> >> >>> >>> Thinking about a portable version, we can break down these
>> requirements further:
>> >> >>> >>> - To support pushdown into *any SDK* sources we need
>> >> >>> >>>   a) a portable way to define (or discover) what projections a
>> source is capable of
>> >> >>> >>>   b) a portable way to modify the source to perform those
>> projections
>> >> >>> >>> - To support tracking field accesses in *any SDK* transforms
>> we need
>> >> >>> >>>   c) a portable way to annotate field accesses (this isn't so
>> bad, we know how to annotate transforms, but the devil is in the details I
>> suppose).
>> >> >>> >>> - To support pushdown when building a pipeline with *any SDK*
>> we need
>> >> >>> >>>   d) some SDK-agnostic code for applying the pushdown
>> >> >>> >>>
>> >> >>> >>> I think (a,b,c) are solvable, but they depend on (d), which is
>> a big unknown (to me). It looks like Option 2 pursues putting this pushdown
>> code in runners, and having them operate on the Runner API protos directly.
>> I have a few concerns with this:
>> >> >>> >>> - It precludes having complex logic for (a,b) in the SDKs.
>> This was touched on in your document already [1]. I don't think it's that
>> problematic for projection, but as we look forward to predicate pushdown I
>> think we'll need to allow complex logic in the SDK for discovering
>> supported predicates (e.g. see the logic for predicate discovery in
>> BigQuery [2]).
>> >> >>> >>> - Will individual runners be responsible for implementing
>> this? We could put the logic in runners-core-construction-java to re-use it
>> across JVM-based runners, but what about others? Does this just move the
>> code duplication from the SDK side to the runner side?
>> >> >>> >>> - Finally, Option 1 is not really a stepping stone to this
>> approach.
>> >> >>> >>>
>> >> >>> >>> An alternative portable approach might be to have an
>> intermediate cross-SDK and cross-runner "optimizer", which (optionally)
>> runs between the SDK and the runner. It could send expansion requests back
>> to SDKs to "re-expand" sources with projections (and later predicates)
>> applied. This would address all of the above:
>> >> >>> >>> - Since we make expansion requests to apply projections, the
>> SDKs can implement complex logic for (a,b).
>> >> >>> >>> - The pushdown code isn't in a runner or an SDK, we'd only
>> need one optimizer implementation.
>> >> >>> >>> - Option 1 could be a stepping stone to this approach. The
>> optimizer could be implemented with code from Option 1, if it's written
>> with this in mind.
>> >> >>> >>>
>> >> >>> >>> This does have some downsides:
>> >> >>> >>> - Sources are expanded twice. IO authors will need to avoid
>> side effects in expansion methods, and any expensive logic will be executed
>> twice.
>> >> >>> >>> - It adds a Java dependency for non-Java SDKs (It is cleanly
>> separable though: if you don't want pushdown you don't need Java).
>> >> >>> >>>
>> >> >>> >>> I'm curious what others think about this approach. Is it
>> tenable? Am I missing something?
>> >> >>> >>>
>> >> >>> >>> Thanks!
>> >> >>> >>> Brian
>> >> >>> >>>
>> >> >>> >>> [1]
>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
>> >> >>> >>> [2]
>> https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
>> >> >>> >>>
>> >> >>> >>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com>
>> wrote:
>> >> >>> >>>>
>> >> >>> >>>> Thanks, I made a pass over Option 2
>> >> >>> >>>>
>> >> >>> >>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <
>> kcweaver@google.com> wrote:
>> >> >>> >>>>>
>> >> >>> >>>>> Hi again everyone, thanks for all of your feedback. I've
>> almost completely rewritten the document to include another design option
>> and address more possibilities and challenges. Please take a look.
>> >> >>> >>>>>
>> >> >>> >>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <
>> xinyuliu.us@gmail.com> wrote:
>> >> >>> >>>>>>
>> >> >>> >>>>>> Very happy to see we will have pushdown optimizations for
>> java pipelines! Thanks for sharing the proposal.
>> >> >>> >>>>>>
>> >> >>> >>>>>> Thanks,
>> >> >>> >>>>>> XInyu
>> >> >>> >>>>>>
>> >> >>> >>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>> >> >>> >>>>>>>
>> >> >>> >>>>>>> Thanks Kyle, very promising. I left some comments.
>> >> >>> >>>>>>>
>> >> >>> >>>>>>> —
>> >> >>> >>>>>>> Alexey
>> >> >>> >>>>>>>
>> >> >>> >>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com>
>> wrote:
>> >> >>> >>>>>>>
>> >> >>> >>>>>>> Thanks, I took a look at it.
>> >> >>> >>>>>>>
>> >> >>> >>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <
>> kcweaver@google.com> wrote:
>> >> >>> >>>>>>>>
>> >> >>> >>>>>>>> Hi Beam devs,
>> >> >>> >>>>>>>>
>> >> >>> >>>>>>>> I'm back with another proposal involving Schema-based
>> optimization in the Beam Java SDK. This one builds on the ideas in my
>> previous proposal and is broader in scope. Please leave comments if this
>> area is of interest to you.
>> >> >>> >>>>>>>>
>> >> >>> >>>>>>>>
>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>> >> >>> >>>>>>>>
>> >> >>> >>>>>>>> Thank you,
>> >> >>> >>>>>>>> Kyle
>> >> >>> >>>>>>>
>> >> >>> >>>>>>>
>>
>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Brian Hulette <bh...@google.com>.
The other drawback of Option 2 is that we need a portable way to encode
information about projections supported by a source, i.e.:

  // Describes which projections are supported by this ParDo.
  message ProjectSupport {
    // Whether projection is supported at all. If false, no projections are
supported.
    bool supports_projection = 1;
    // Whether a projection that outputs fields in a different order than
its input is supported.
    bool supports_field_reordering = 2;
  }

This looks reasonable for projection but I don't think it's feasible
looking forward to predicate pushdown, where there can be much
more variability in what a source supports.


On Mon, Sep 20, 2021 at 3:35 PM Robert Bradshaw <ro...@google.com> wrote:

> On Mon, Sep 20, 2021 at 3:26 PM Luke Cwik <lc...@google.com> wrote:
> >
> > We could make SDKs that support projection/pushdown/... be required to
> expose an expansion like API as part of the SDK container. This way we
> don't have to worry about the original expansion service that may have
> produced the transforms in that case.
>
> Yes, as mentioned this would mean attaching environments to composites as
> well.
>
> > As for "d) some SDK-agnostic code for applying the pushdown", can we not
> have a runner say "filter on field X and Y" in an agnostic way as part of
> the ProcessBundleDescriptor and then have the SDK apply the optimization
> using SDK specific code? This way the SDK would be responsible for
> converting the generic representation into the IO specific representation.
>
> That is precisely what my implementation does. The only missing bit is
> the ability to provide alternative expansions for composite operations
> (pre-fusion and other optimizations, of course). But given the
> inventory of sources, I'm not convinced that's needed. (As we've
> discussed, a runner-independent way of providing multiple alternative
> expansions for a composite could be provided anyway.)


> > On Thu, Sep 16, 2021 at 5:01 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >>  On Thu, Sep 16, 2021 at 3:45 PM Kyle Weaver <kc...@google.com>
> wrote:
> >> >>
> >> >> I think this
> >> >> came up earlier, but our discussion could be informed by concrete
> >> >> examples of 4-5 sources that support pushdown and provide sufficient
> >> >> diversity.
> >> >
> >> > I looked at the four IOs that support pushdown in BeamSqlTable:
> BigQuery, BigTable, MongoDB, and Parquet. Of the four, only ParquetIO
> expands into a simple ParDo(DoFn).
> >>
> >> Interesting. In Python ParquetIO is the complicated one that expands
> >> into a Read.from(BoundedSource) followed by a DoFn.
> >>
> >> > The other three all expand into Read.from(BoundedSource).
> >>
> >> This is the nice, trivial case where there is no composite structure
> >> to worry about. Do we have any examples where we'd need additional
> >> complexity to be able to expand composites differently?
> >>
> >> > This introduces yet another layer of indirection. BoundedSource is
> where we must receive the pushdown request, but the
> BoundedSourceAsSDFWrapperFn is the actual DoFn, so we would need to pass
> pushdown information from BoundedSourceAsSDFWrapperFn to BoundedSource.
> >>
> >> I don't think this extra level of indirection is a problem. It's what
> >> one would expect of a shim whose purpose is to wrap one API in
> >> another, and should be quite straightforward.
> >>
> >> > This is a symptom of a larger problem: option 3 requires plumbing
> pushdown information through numerous layers of the Beam stack, from the
> protos to the SDK worker to the SDK’s DoFn implementation, and finally all
> the way through each IO implementation. In contrast, option 1 is entirely
> self-contained. It requires one method for pipeline optimization, and one
> trivial withProjectionPushdown method per IO.
> >>
> >> It's true that an option scoped to a single language/SDK can be more
> >> self-contained, but the whole point is that we don't want to be so
> >> self-contained. Anything that operates at a language-agnostic level
> >> requires plumbing between the user code and the protos, but this is
> >> O(1) work and you're done (<200 lines for Python) and are back to just
> >> writing a projection method per IO (or, at most, per step in the IO,
> >> though it looks like that's usually 1 per the four sources mentioned
> >> above).
> >>
> >>
> >> > Whether option 1 is a stepping stone to the portable/cross-language
> solution remains to be seen; it depends on the portable/cross-language
> solution we end up choosing. But it's the simplest solution to the problem
> I set out to solve.
> >> >
> >> >
> >> >
> >> > On Thu, Sep 16, 2021 at 12:59 PM Brian Hulette <bh...@google.com>
> wrote:
> >> >>>
> >> >>> This would be a little bit like the environment
> >> >>> (though it would also get attached to composites); maybe a reference
> >> >>> to docker image that could get started up and communicated with?
> >> >>
> >> >>
> >> >> I was thinking along similar lines. The idea I had was that
> composites from the non-submitting SDK could be annotated with the
> expansion service that they came from, and we'd have a "loopback" service
> for the submitting SDK transforms. But that would require that the
> expansion services are still running, or we include enough information to
> start them up again.
> >> >>
> >> >>> Note that this does not obviate the need to come up with some
> >> >>> language-agnostic way of defining the projections/predicates one
> wants
> >> >>> to perform, but it does let one indicate whether one supports a
> given
> >> >>> projection in code rather than having to come up with a language for
> >> >>> that as well. The primary advantage is that it allows one to act on
> >> >>> composites rather than at the level of primitives which is nice.
> >> >>> Whether this is necessary depends on how often full expansions
> (rather
> >> >>> than just the primitives) differ when a projection/predicate is
> pushed
> >> >>> down, and I have yet to come up with an example of that. (I think
> this
> >> >>> came up earlier, but our discussion could be informed by concrete
> >> >>> examples of 4-5 sources that support pushdown and provide sufficient
> >> >>> diversity.)
> >> >>
> >> >>
> >> >> Thanks, this articulates the advantages well. I don't think we can
> avoid having a language-agnostic way to represent projections/predicates
> (there may be something open source we can leverage or at least model here,
> e.g. I'm keeping an eye on Substrait [1,2]).
> >> >> I think you're probably right that it's rare a source with a
> pushdown applied will need a different set of primitives. So the biggest
> advantage of Option 4 is that it lets you articulate supported pushdowns in
> code. If we create an entirely new "pushdown" service, maybe it could just
> handle that instead of doing the full expansion.
> >> >>
> >> >> [1] https://substrait.io/
> >> >> [2]
> https://lists.apache.org/thread.html/r6112e3c28cd6759867c3bc37b546c0d8183f9431c6dbe234d3af018f%40%3Cdev.calcite.apache.org%3E
> >> >>
> >> >> On Wed, Sep 15, 2021 at 6:10 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >>>
> >> >>> The re-expansion idea is an interesting one. Let's call this Option
> 4.
> >> >>> The main question is how it would be invoked.
> >> >>>
> >> >>> As well as adding complexity, I don't think it would work to do it
> as
> >> >>> part of the SDK's submission to the runner. For one thing, it'd be
> >> >>> best if we could just fire-and-forget, even writing the spec to disk
> >> >>> and then having the runner pick it up later, and we'd have to figure
> >> >>> out how to make this work with interactive and similar services that
> >> >>> act as runners but are really proxies for other runners. The other
> >> >>> problem is that the submitting SDK may not be the SDK that generated
> >> >>> the transform.
> >> >>>
> >> >>> The other option would be to have some kind of a "pushdown service"
> >> >>> that would be referenced on the transform and could be invoked to do
> >> >>> the re-expansion. This would be a little bit like the environment
> >> >>> (though it would also get attached to composites); maybe a reference
> >> >>> to docker image that could get started up and communicated with?
> >> >>> (Maybe the SDK image itself could be used with an alternative
> >> >>> entrypoint? Assuming docker-on-docker works well...) Might get a bit
> >> >>> heavy-weight, but maybe we make it optional (though I'd rather it be
> >> >>> something you get by default).
> >> >>>
> >> >>> Definitely worth expanding more.
> >> >>>
> >> >>>
> >> >>>
> >> >>> My primary concern with Option 1 has been that it does not seem a
> >> >>> stepping stone for anything portable/multi-language, and as we'll
> >> >>> definitely want to support that (IMHO sooner rather than later) I
> >> >>> think whatever we build should go in that direction.
> >> >>>
> >> >>> Note that this does not obviate the need to come up with some
> >> >>> language-agnostic way of defining the projections/predicates one
> wants
> >> >>> to perform, but it does let one indicate whether one supports a
> given
> >> >>> projection in code rather than having to come up with a language for
> >> >>> that as well. The primary advantage is that it allows one to act on
> >> >>> composites rather than at the level of primitives which is nice.
> >> >>> Whether this is necessary depends on how often full expansions
> (rather
> >> >>> than just the primitives) differ when a projection/predicate is
> pushed
> >> >>> down, and I have yet to come up with an example of that. (I think
> this
> >> >>> came up earlier, but our discussion could be informed by concrete
> >> >>> examples of 4-5 sources that support pushdown and provide sufficient
> >> >>> diversity.)
> >> >>>
> >> >>> I do think Option 3 could be a stepping stone to Option 4, viewing
> it
> >> >>> as a mechanism for "simple" pushdowns to be done inline rather than
> a
> >> >>> via remote execution.
> >> >>>
> >> >>> - Robert
> >> >>>
> >> >>>
> >> >>> On Wed, Sep 15, 2021 at 1:59 PM Kyle Weaver <kc...@google.com>
> wrote:
> >> >>> >
> >> >>> > Oops, I accidentally hit the send button...
> >> >>> >
> >> >>> > As I was saying, I think that's an excellent idea. I discussed
> this with Andrew previously and we came to a similar conclusion, that
> re-expansion is more flexible than annotations, especially when it comes to
> predicate pushdown.
> >> >>> >
> >> >>> > Robert made a prototype [1] of Option 2 in the Python SDK that
> should answer some of your questions about that option. Specifically, for
> your requirement d), the SDK worker could apply the pushdown before
> processing. Since SDK workers are shared across runners, that would mean
> the individual runners themselves would not have to re-implement pushdown.
> This approach has other drawbacks, though, which I have enumerated in my
> design doc.
> >> >>> >
> >> >>> > [1] https://github.com/apache/beam/pull/15351
> >> >>> >
> >> >>> > On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <kc...@google.com>
> wrote:
> >> >>> >>
> >> >>> >> Hi Brian. I think that's an excele
> >> >>> >>
> >> >>> >> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <
> bhulette@google.com> wrote:
> >> >>> >>>
> >> >>> >>> Thanks for the proposal Kyle! I wanted to write up some
> thoughts around the portable proposal (Option 2), and it grew well beyond
> the size of a doc comment.
> >> >>> >>>
> >> >>> >>> The basic requirements for this work, which are satisfied by
> Option 1, are:
> >> >>> >>> - to allow pushdown into *Java SDK* sources
> >> >>> >>> - based on field accesses in *Java SDK* transforms
> >> >>> >>> - when building a pipeline with the *Java SDK*.
> >> >>> >>>
> >> >>> >>> However down the road I would really like to see all of those
> requirements go from *Java SDK* to *any SDK*. It doesn't need to happen
> right away, but it would be great to avoid going down a path that's not
> amenable to such an approach in the future. So it's worth thinking about
> what the "any SDK" approach will look like now. (Presumably others feel
> this way as well, thus Option 2).
> >> >>> >>>
> >> >>> >>> Thinking about a portable version, we can break down these
> requirements further:
> >> >>> >>> - To support pushdown into *any SDK* sources we need
> >> >>> >>>   a) a portable way to define (or discover) what projections a
> source is capable of
> >> >>> >>>   b) a portable way to modify the source to perform those
> projections
> >> >>> >>> - To support tracking field accesses in *any SDK* transforms we
> need
> >> >>> >>>   c) a portable way to annotate field accesses (this isn't so
> bad, we know how to annotate transforms, but the devil is in the details I
> suppose).
> >> >>> >>> - To support pushdown when building a pipeline with *any SDK*
> we need
> >> >>> >>>   d) some SDK-agnostic code for applying the pushdown
> >> >>> >>>
> >> >>> >>> I think (a,b,c) are solvable, but they depend on (d), which is
> a big unknown (to me). It looks like Option 2 pursues putting this pushdown
> code in runners, and having them operate on the Runner API protos directly.
> I have a few concerns with this:
> >> >>> >>> - It precludes having complex logic for (a,b) in the SDKs. This
> was touched on in your document already [1]. I don't think it's that
> problematic for projection, but as we look forward to predicate pushdown I
> think we'll need to allow complex logic in the SDK for discovering
> supported predicates (e.g. see the logic for predicate discovery in
> BigQuery [2]).
> >> >>> >>> - Will individual runners be responsible for implementing this?
> We could put the logic in runners-core-construction-java to re-use it
> across JVM-based runners, but what about others? Does this just move the
> code duplication from the SDK side to the runner side?
> >> >>> >>> - Finally, Option 1 is not really a stepping stone to this
> approach.
> >> >>> >>>
> >> >>> >>> An alternative portable approach might be to have an
> intermediate cross-SDK and cross-runner "optimizer", which (optionally)
> runs between the SDK and the runner. It could send expansion requests back
> to SDKs to "re-expand" sources with projections (and later predicates)
> applied. This would address all of the above:
> >> >>> >>> - Since we make expansion requests to apply projections, the
> SDKs can implement complex logic for (a,b).
> >> >>> >>> - The pushdown code isn't in a runner or an SDK, we'd only need
> one optimizer implementation.
> >> >>> >>> - Option 1 could be a stepping stone to this approach. The
> optimizer could be implemented with code from Option 1, if it's written
> with this in mind.
> >> >>> >>>
> >> >>> >>> This does have some downsides:
> >> >>> >>> - Sources are expanded twice. IO authors will need to avoid
> side effects in expansion methods, and any expensive logic will be executed
> twice.
> >> >>> >>> - It adds a Java dependency for non-Java SDKs (It is cleanly
> separable though: if you don't want pushdown you don't need Java).
> >> >>> >>>
> >> >>> >>> I'm curious what others think about this approach. Is it
> tenable? Am I missing something?
> >> >>> >>>
> >> >>> >>> Thanks!
> >> >>> >>> Brian
> >> >>> >>>
> >> >>> >>> [1]
> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
> >> >>> >>> [2]
> https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
> >> >>> >>>
> >> >>> >>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com>
> wrote:
> >> >>> >>>>
> >> >>> >>>> Thanks, I made a pass over Option 2
> >> >>> >>>>
> >> >>> >>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <
> kcweaver@google.com> wrote:
> >> >>> >>>>>
> >> >>> >>>>> Hi again everyone, thanks for all of your feedback. I've
> almost completely rewritten the document to include another design option
> and address more possibilities and challenges. Please take a look.
> >> >>> >>>>>
> >> >>> >>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <
> xinyuliu.us@gmail.com> wrote:
> >> >>> >>>>>>
> >> >>> >>>>>> Very happy to see we will have pushdown optimizations for
> java pipelines! Thanks for sharing the proposal.
> >> >>> >>>>>>
> >> >>> >>>>>> Thanks,
> >> >>> >>>>>> XInyu
> >> >>> >>>>>>
> >> >>> >>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
> >> >>> >>>>>>>
> >> >>> >>>>>>> Thanks Kyle, very promising. I left some comments.
> >> >>> >>>>>>>
> >> >>> >>>>>>> —
> >> >>> >>>>>>> Alexey
> >> >>> >>>>>>>
> >> >>> >>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com>
> wrote:
> >> >>> >>>>>>>
> >> >>> >>>>>>> Thanks, I took a look at it.
> >> >>> >>>>>>>
> >> >>> >>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <
> kcweaver@google.com> wrote:
> >> >>> >>>>>>>>
> >> >>> >>>>>>>> Hi Beam devs,
> >> >>> >>>>>>>>
> >> >>> >>>>>>>> I'm back with another proposal involving Schema-based
> optimization in the Beam Java SDK. This one builds on the ideas in my
> previous proposal and is broader in scope. Please leave comments if this
> area is of interest to you.
> >> >>> >>>>>>>>
> >> >>> >>>>>>>>
> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
> >> >>> >>>>>>>>
> >> >>> >>>>>>>> Thank you,
> >> >>> >>>>>>>> Kyle
> >> >>> >>>>>>>
> >> >>> >>>>>>>
>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, Sep 20, 2021 at 3:26 PM Luke Cwik <lc...@google.com> wrote:
>
> We could make SDKs that support projection/pushdown/... be required to expose an expansion like API as part of the SDK container. This way we don't have to worry about the original expansion service that may have produced the transforms in that case.

Yes, as mentioned this would mean attaching environments to composites as well.

> As for "d) some SDK-agnostic code for applying the pushdown", can we not have a runner say "filter on field X and Y" in an agnostic way as part of the ProcessBundleDescriptor and then have the SDK apply the optimization using SDK specific code? This way the SDK would be responsible for converting the generic representation into the IO specific representation.

That is precisely what my implementation does. The only missing bit is
the ability to provide alternative expansions for composite operations
(pre-fusion and other optimizations, of course). But given the
inventory of sources, I'm not convinced that's needed. (As we've
discussed, a runner-independent way of providing multiple alternative
expansions for a composite could be provided anyway.)

> On Thu, Sep 16, 2021 at 5:01 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>>  On Thu, Sep 16, 2021 at 3:45 PM Kyle Weaver <kc...@google.com> wrote:
>> >>
>> >> I think this
>> >> came up earlier, but our discussion could be informed by concrete
>> >> examples of 4-5 sources that support pushdown and provide sufficient
>> >> diversity.
>> >
>> > I looked at the four IOs that support pushdown in BeamSqlTable: BigQuery, BigTable, MongoDB, and Parquet. Of the four, only ParquetIO expands into a simple ParDo(DoFn).
>>
>> Interesting. In Python ParquetIO is the complicated one that expands
>> into a Read.from(BoundedSource) followed by a DoFn.
>>
>> > The other three all expand into Read.from(BoundedSource).
>>
>> This is the nice, trivial case where there is no composite structure
>> to worry about. Do we have any examples where we'd need additional
>> complexity to be able to expand composites differently?
>>
>> > This introduces yet another layer of indirection. BoundedSource is where we must receive the pushdown request, but the BoundedSourceAsSDFWrapperFn is the actual DoFn, so we would need to pass pushdown information from BoundedSourceAsSDFWrapperFn to BoundedSource.
>>
>> I don't think this extra level of indirection is a problem. It's what
>> one would expect of a shim whose purpose is to wrap one API in
>> another, and should be quite straightforward.
>>
>> > This is a symptom of a larger problem: option 3 requires plumbing pushdown information through numerous layers of the Beam stack, from the protos to the SDK worker to the SDK’s DoFn implementation, and finally all the way through each IO implementation. In contrast, option 1 is entirely self-contained. It requires one method for pipeline optimization, and one trivial withProjectionPushdown method per IO.
>>
>> It's true that an option scoped to a single language/SDK can be more
>> self-contained, but the whole point is that we don't want to be so
>> self-contained. Anything that operates at a language-agnostic level
>> requires plumbing between the user code and the protos, but this is
>> O(1) work and you're done (<200 lines for Python) and are back to just
>> writing a projection method per IO (or, at most, per step in the IO,
>> though it looks like that's usually 1 per the four sources mentioned
>> above).
>>
>>
>> > Whether option 1 is a stepping stone to the portable/cross-language solution remains to be seen; it depends on the portable/cross-language solution we end up choosing. But it's the simplest solution to the problem I set out to solve.
>> >
>> >
>> >
>> > On Thu, Sep 16, 2021 at 12:59 PM Brian Hulette <bh...@google.com> wrote:
>> >>>
>> >>> This would be a little bit like the environment
>> >>> (though it would also get attached to composites); maybe a reference
>> >>> to docker image that could get started up and communicated with?
>> >>
>> >>
>> >> I was thinking along similar lines. The idea I had was that composites from the non-submitting SDK could be annotated with the expansion service that they came from, and we'd have a "loopback" service for the submitting SDK transforms. But that would require that the expansion services are still running, or we include enough information to start them up again.
>> >>
>> >>> Note that this does not obviate the need to come up with some
>> >>> language-agnostic way of defining the projections/predicates one wants
>> >>> to perform, but it does let one indicate whether one supports a given
>> >>> projection in code rather than having to come up with a language for
>> >>> that as well. The primary advantage is that it allows one to act on
>> >>> composites rather than at the level of primitives which is nice.
>> >>> Whether this is necessary depends on how often full expansions (rather
>> >>> than just the primitives) differ when a projection/predicate is pushed
>> >>> down, and I have yet to come up with an example of that. (I think this
>> >>> came up earlier, but our discussion could be informed by concrete
>> >>> examples of 4-5 sources that support pushdown and provide sufficient
>> >>> diversity.)
>> >>
>> >>
>> >> Thanks, this articulates the advantages well. I don't think we can avoid having a language-agnostic way to represent projections/predicates (there may be something open source we can leverage or at least model here, e.g. I'm keeping an eye on Substrait [1,2]).
>> >> I think you're probably right that it's rare a source with a pushdown applied will need a different set of primitives. So the biggest advantage of Option 4 is that it lets you articulate supported pushdowns in code. If we create an entirely new "pushdown" service, maybe it could just handle that instead of doing the full expansion.
>> >>
>> >> [1] https://substrait.io/
>> >> [2] https://lists.apache.org/thread.html/r6112e3c28cd6759867c3bc37b546c0d8183f9431c6dbe234d3af018f%40%3Cdev.calcite.apache.org%3E
>> >>
>> >> On Wed, Sep 15, 2021 at 6:10 PM Robert Bradshaw <ro...@google.com> wrote:
>> >>>
>> >>> The re-expansion idea is an interesting one. Let's call this Option 4.
>> >>> The main question is how it would be invoked.
>> >>>
>> >>> As well as adding complexity, I don't think it would work to do it as
>> >>> part of the SDK's submission to the runner. For one thing, it'd be
>> >>> best if we could just fire-and-forget, even writing the spec to disk
>> >>> and then having the runner pick it up later, and we'd have to figure
>> >>> out how to make this work with interactive and similar services that
>> >>> act as runners but are really proxies for other runners. The other
>> >>> problem is that the submitting SDK may not be the SDK that generated
>> >>> the transform.
>> >>>
>> >>> The other option would be to have some kind of a "pushdown service"
>> >>> that would be referenced on the transform and could be invoked to do
>> >>> the re-expansion. This would be a little bit like the environment
>> >>> (though it would also get attached to composites); maybe a reference
>> >>> to docker image that could get started up and communicated with?
>> >>> (Maybe the SDK image itself could be used with an alternative
>> >>> entrypoint? Assuming docker-on-docker works well...) Might get a bit
>> >>> heavy-weight, but maybe we make it optional (though I'd rather it be
>> >>> something you get by default).
>> >>>
>> >>> Definitely worth expanding more.
>> >>>
>> >>>
>> >>>
>> >>> My primary concern with Option 1 has been that it does not seem a
>> >>> stepping stone for anything portable/multi-language, and as we'll
>> >>> definitely want to support that (IMHO sooner rather than later) I
>> >>> think whatever we build should go in that direction.
>> >>>
>> >>> Note that this does not obviate the need to come up with some
>> >>> language-agnostic way of defining the projections/predicates one wants
>> >>> to perform, but it does let one indicate whether one supports a given
>> >>> projection in code rather than having to come up with a language for
>> >>> that as well. The primary advantage is that it allows one to act on
>> >>> composites rather than at the level of primitives which is nice.
>> >>> Whether this is necessary depends on how often full expansions (rather
>> >>> than just the primitives) differ when a projection/predicate is pushed
>> >>> down, and I have yet to come up with an example of that. (I think this
>> >>> came up earlier, but our discussion could be informed by concrete
>> >>> examples of 4-5 sources that support pushdown and provide sufficient
>> >>> diversity.)
>> >>>
>> >>> I do think Option 3 could be a stepping stone to Option 4, viewing it
>> >>> as a mechanism for "simple" pushdowns to be done inline rather than a
>> >>> via remote execution.
>> >>>
>> >>> - Robert
>> >>>
>> >>>
>> >>> On Wed, Sep 15, 2021 at 1:59 PM Kyle Weaver <kc...@google.com> wrote:
>> >>> >
>> >>> > Oops, I accidentally hit the send button...
>> >>> >
>> >>> > As I was saying, I think that's an excellent idea. I discussed this with Andrew previously and we came to a similar conclusion, that re-expansion is more flexible than annotations, especially when it comes to predicate pushdown.
>> >>> >
>> >>> > Robert made a prototype [1] of Option 2 in the Python SDK that should answer some of your questions about that option. Specifically, for your requirement d), the SDK worker could apply the pushdown before processing. Since SDK workers are shared across runners, that would mean the individual runners themselves would not have to re-implement pushdown. This approach has other drawbacks, though, which I have enumerated in my design doc.
>> >>> >
>> >>> > [1] https://github.com/apache/beam/pull/15351
>> >>> >
>> >>> > On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <kc...@google.com> wrote:
>> >>> >>
>> >>> >> Hi Brian. I think that's an excele
>> >>> >>
>> >>> >> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <bh...@google.com> wrote:
>> >>> >>>
>> >>> >>> Thanks for the proposal Kyle! I wanted to write up some thoughts around the portable proposal (Option 2), and it grew well beyond the size of a doc comment.
>> >>> >>>
>> >>> >>> The basic requirements for this work, which are satisfied by Option 1, are:
>> >>> >>> - to allow pushdown into *Java SDK* sources
>> >>> >>> - based on field accesses in *Java SDK* transforms
>> >>> >>> - when building a pipeline with the *Java SDK*.
>> >>> >>>
>> >>> >>> However down the road I would really like to see all of those requirements go from *Java SDK* to *any SDK*. It doesn't need to happen right away, but it would be great to avoid going down a path that's not amenable to such an approach in the future. So it's worth thinking about what the "any SDK" approach will look like now. (Presumably others feel this way as well, thus Option 2).
>> >>> >>>
>> >>> >>> Thinking about a portable version, we can break down these requirements further:
>> >>> >>> - To support pushdown into *any SDK* sources we need
>> >>> >>>   a) a portable way to define (or discover) what projections a source is capable of
>> >>> >>>   b) a portable way to modify the source to perform those projections
>> >>> >>> - To support tracking field accesses in *any SDK* transforms we need
>> >>> >>>   c) a portable way to annotate field accesses (this isn't so bad, we know how to annotate transforms, but the devil is in the details I suppose).
>> >>> >>> - To support pushdown when building a pipeline with *any SDK* we need
>> >>> >>>   d) some SDK-agnostic code for applying the pushdown
>> >>> >>>
>> >>> >>> I think (a,b,c) are solvable, but they depend on (d), which is a big unknown (to me). It looks like Option 2 pursues putting this pushdown code in runners, and having them operate on the Runner API protos directly. I have a few concerns with this:
>> >>> >>> - It precludes having complex logic for (a,b) in the SDKs. This was touched on in your document already [1]. I don't think it's that problematic for projection, but as we look forward to predicate pushdown I think we'll need to allow complex logic in the SDK for discovering supported predicates (e.g. see the logic for predicate discovery in BigQuery [2]).
>> >>> >>> - Will individual runners be responsible for implementing this? We could put the logic in runners-core-construction-java to re-use it across JVM-based runners, but what about others? Does this just move the code duplication from the SDK side to the runner side?
>> >>> >>> - Finally, Option 1 is not really a stepping stone to this approach.
>> >>> >>>
>> >>> >>> An alternative portable approach might be to have an intermediate cross-SDK and cross-runner "optimizer", which (optionally) runs between the SDK and the runner. It could send expansion requests back to SDKs to "re-expand" sources with projections (and later predicates) applied. This would address all of the above:
>> >>> >>> - Since we make expansion requests to apply projections, the SDKs can implement complex logic for (a,b).
>> >>> >>> - The pushdown code isn't in a runner or an SDK, we'd only need one optimizer implementation.
>> >>> >>> - Option 1 could be a stepping stone to this approach. The optimizer could be implemented with code from Option 1, if it's written with this in mind.
>> >>> >>>
>> >>> >>> This does have some downsides:
>> >>> >>> - Sources are expanded twice. IO authors will need to avoid side effects in expansion methods, and any expensive logic will be executed twice.
>> >>> >>> - It adds a Java dependency for non-Java SDKs (It is cleanly separable though: if you don't want pushdown you don't need Java).
>> >>> >>>
>> >>> >>> I'm curious what others think about this approach. Is it tenable? Am I missing something?
>> >>> >>>
>> >>> >>> Thanks!
>> >>> >>> Brian
>> >>> >>>
>> >>> >>> [1] https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
>> >>> >>> [2] https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
>> >>> >>>
>> >>> >>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com> wrote:
>> >>> >>>>
>> >>> >>>> Thanks, I made a pass over Option 2
>> >>> >>>>
>> >>> >>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <kc...@google.com> wrote:
>> >>> >>>>>
>> >>> >>>>> Hi again everyone, thanks for all of your feedback. I've almost completely rewritten the document to include another design option and address more possibilities and challenges. Please take a look.
>> >>> >>>>>
>> >>> >>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <xi...@gmail.com> wrote:
>> >>> >>>>>>
>> >>> >>>>>> Very happy to see we will have pushdown optimizations for java pipelines! Thanks for sharing the proposal.
>> >>> >>>>>>
>> >>> >>>>>> Thanks,
>> >>> >>>>>> XInyu
>> >>> >>>>>>
>> >>> >>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <ar...@gmail.com> wrote:
>> >>> >>>>>>>
>> >>> >>>>>>> Thanks Kyle, very promising. I left some comments.
>> >>> >>>>>>>
>> >>> >>>>>>> —
>> >>> >>>>>>> Alexey
>> >>> >>>>>>>
>> >>> >>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com> wrote:
>> >>> >>>>>>>
>> >>> >>>>>>> Thanks, I took a look at it.
>> >>> >>>>>>>
>> >>> >>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <kc...@google.com> wrote:
>> >>> >>>>>>>>
>> >>> >>>>>>>> Hi Beam devs,
>> >>> >>>>>>>>
>> >>> >>>>>>>> I'm back with another proposal involving Schema-based optimization in the Beam Java SDK. This one builds on the ideas in my previous proposal and is broader in scope. Please leave comments if this area is of interest to you.
>> >>> >>>>>>>>
>> >>> >>>>>>>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>> >>> >>>>>>>>
>> >>> >>>>>>>> Thank you,
>> >>> >>>>>>>> Kyle
>> >>> >>>>>>>
>> >>> >>>>>>>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Luke Cwik <lc...@google.com>.
We could make SDKs that support projection/pushdown/... be required to
expose an expansion like API as part of the SDK container. This way we
don't have to worry about the original expansion service that may have
produced the transforms in that case.

As for "d) some SDK-agnostic code for applying the pushdown", can we not
have a runner say "filter on field X and Y" in an agnostic way as part of
the ProcessBundleDescriptor and then have the SDK apply the optimization
using SDK specific code? This way the SDK would be responsible for
converting the generic representation into the IO specific representation.

On Thu, Sep 16, 2021 at 5:01 PM Robert Bradshaw <ro...@google.com> wrote:

>  On Thu, Sep 16, 2021 at 3:45 PM Kyle Weaver <kc...@google.com> wrote:
> >>
> >> I think this
> >> came up earlier, but our discussion could be informed by concrete
> >> examples of 4-5 sources that support pushdown and provide sufficient
> >> diversity.
> >
> > I looked at the four IOs that support pushdown in BeamSqlTable:
> BigQuery, BigTable, MongoDB, and Parquet. Of the four, only ParquetIO
> expands into a simple ParDo(DoFn).
>
> Interesting. In Python ParquetIO is the complicated one that expands
> into a Read.from(BoundedSource) followed by a DoFn.
>
> > The other three all expand into Read.from(BoundedSource).
>
> This is the nice, trivial case where there is no composite structure
> to worry about. Do we have any examples where we'd need additional
> complexity to be able to expand composites differently?
>
> > This introduces yet another layer of indirection. BoundedSource is where
> we must receive the pushdown request, but the BoundedSourceAsSDFWrapperFn
> is the actual DoFn, so we would need to pass pushdown information from
> BoundedSourceAsSDFWrapperFn to BoundedSource.
>
> I don't think this extra level of indirection is a problem. It's what
> one would expect of a shim whose purpose is to wrap one API in
> another, and should be quite straightforward.
>
> > This is a symptom of a larger problem: option 3 requires plumbing
> pushdown information through numerous layers of the Beam stack, from the
> protos to the SDK worker to the SDK’s DoFn implementation, and finally all
> the way through each IO implementation. In contrast, option 1 is entirely
> self-contained. It requires one method for pipeline optimization, and one
> trivial withProjectionPushdown method per IO.
>
> It's true that an option scoped to a single language/SDK can be more
> self-contained, but the whole point is that we don't want to be so
> self-contained. Anything that operates at a language-agnostic level
> requires plumbing between the user code and the protos, but this is
> O(1) work and you're done (<200 lines for Python) and are back to just
> writing a projection method per IO (or, at most, per step in the IO,
> though it looks like that's usually 1 per the four sources mentioned
> above).
>
>
> > Whether option 1 is a stepping stone to the portable/cross-language
> solution remains to be seen; it depends on the portable/cross-language
> solution we end up choosing. But it's the simplest solution to the problem
> I set out to solve.
> >
> >
> >
> > On Thu, Sep 16, 2021 at 12:59 PM Brian Hulette <bh...@google.com>
> wrote:
> >>>
> >>> This would be a little bit like the environment
> >>> (though it would also get attached to composites); maybe a reference
> >>> to docker image that could get started up and communicated with?
> >>
> >>
> >> I was thinking along similar lines. The idea I had was that composites
> from the non-submitting SDK could be annotated with the expansion service
> that they came from, and we'd have a "loopback" service for the submitting
> SDK transforms. But that would require that the expansion services are
> still running, or we include enough information to start them up again.
> >>
> >>> Note that this does not obviate the need to come up with some
> >>> language-agnostic way of defining the projections/predicates one wants
> >>> to perform, but it does let one indicate whether one supports a given
> >>> projection in code rather than having to come up with a language for
> >>> that as well. The primary advantage is that it allows one to act on
> >>> composites rather than at the level of primitives which is nice.
> >>> Whether this is necessary depends on how often full expansions (rather
> >>> than just the primitives) differ when a projection/predicate is pushed
> >>> down, and I have yet to come up with an example of that. (I think this
> >>> came up earlier, but our discussion could be informed by concrete
> >>> examples of 4-5 sources that support pushdown and provide sufficient
> >>> diversity.)
> >>
> >>
> >> Thanks, this articulates the advantages well. I don't think we can
> avoid having a language-agnostic way to represent projections/predicates
> (there may be something open source we can leverage or at least model here,
> e.g. I'm keeping an eye on Substrait [1,2]).
> >> I think you're probably right that it's rare a source with a pushdown
> applied will need a different set of primitives. So the biggest advantage
> of Option 4 is that it lets you articulate supported pushdowns in code. If
> we create an entirely new "pushdown" service, maybe it could just handle
> that instead of doing the full expansion.
> >>
> >> [1] https://substrait.io/
> >> [2]
> https://lists.apache.org/thread.html/r6112e3c28cd6759867c3bc37b546c0d8183f9431c6dbe234d3af018f%40%3Cdev.calcite.apache.org%3E
> >>
> >> On Wed, Sep 15, 2021 at 6:10 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>>
> >>> The re-expansion idea is an interesting one. Let's call this Option 4.
> >>> The main question is how it would be invoked.
> >>>
> >>> As well as adding complexity, I don't think it would work to do it as
> >>> part of the SDK's submission to the runner. For one thing, it'd be
> >>> best if we could just fire-and-forget, even writing the spec to disk
> >>> and then having the runner pick it up later, and we'd have to figure
> >>> out how to make this work with interactive and similar services that
> >>> act as runners but are really proxies for other runners. The other
> >>> problem is that the submitting SDK may not be the SDK that generated
> >>> the transform.
> >>>
> >>> The other option would be to have some kind of a "pushdown service"
> >>> that would be referenced on the transform and could be invoked to do
> >>> the re-expansion. This would be a little bit like the environment
> >>> (though it would also get attached to composites); maybe a reference
> >>> to docker image that could get started up and communicated with?
> >>> (Maybe the SDK image itself could be used with an alternative
> >>> entrypoint? Assuming docker-on-docker works well...) Might get a bit
> >>> heavy-weight, but maybe we make it optional (though I'd rather it be
> >>> something you get by default).
> >>>
> >>> Definitely worth expanding more.
> >>>
> >>>
> >>>
> >>> My primary concern with Option 1 has been that it does not seem a
> >>> stepping stone for anything portable/multi-language, and as we'll
> >>> definitely want to support that (IMHO sooner rather than later) I
> >>> think whatever we build should go in that direction.
> >>>
> >>> Note that this does not obviate the need to come up with some
> >>> language-agnostic way of defining the projections/predicates one wants
> >>> to perform, but it does let one indicate whether one supports a given
> >>> projection in code rather than having to come up with a language for
> >>> that as well. The primary advantage is that it allows one to act on
> >>> composites rather than at the level of primitives which is nice.
> >>> Whether this is necessary depends on how often full expansions (rather
> >>> than just the primitives) differ when a projection/predicate is pushed
> >>> down, and I have yet to come up with an example of that. (I think this
> >>> came up earlier, but our discussion could be informed by concrete
> >>> examples of 4-5 sources that support pushdown and provide sufficient
> >>> diversity.)
> >>>
> >>> I do think Option 3 could be a stepping stone to Option 4, viewing it
> >>> as a mechanism for "simple" pushdowns to be done inline rather than a
> >>> via remote execution.
> >>>
> >>> - Robert
> >>>
> >>>
> >>> On Wed, Sep 15, 2021 at 1:59 PM Kyle Weaver <kc...@google.com>
> wrote:
> >>> >
> >>> > Oops, I accidentally hit the send button...
> >>> >
> >>> > As I was saying, I think that's an excellent idea. I discussed this
> with Andrew previously and we came to a similar conclusion, that
> re-expansion is more flexible than annotations, especially when it comes to
> predicate pushdown.
> >>> >
> >>> > Robert made a prototype [1] of Option 2 in the Python SDK that
> should answer some of your questions about that option. Specifically, for
> your requirement d), the SDK worker could apply the pushdown before
> processing. Since SDK workers are shared across runners, that would mean
> the individual runners themselves would not have to re-implement pushdown.
> This approach has other drawbacks, though, which I have enumerated in my
> design doc.
> >>> >
> >>> > [1] https://github.com/apache/beam/pull/15351
> >>> >
> >>> > On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <kc...@google.com>
> wrote:
> >>> >>
> >>> >> Hi Brian. I think that's an excele
> >>> >>
> >>> >> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <bh...@google.com>
> wrote:
> >>> >>>
> >>> >>> Thanks for the proposal Kyle! I wanted to write up some thoughts
> around the portable proposal (Option 2), and it grew well beyond the size
> of a doc comment.
> >>> >>>
> >>> >>> The basic requirements for this work, which are satisfied by
> Option 1, are:
> >>> >>> - to allow pushdown into *Java SDK* sources
> >>> >>> - based on field accesses in *Java SDK* transforms
> >>> >>> - when building a pipeline with the *Java SDK*.
> >>> >>>
> >>> >>> However down the road I would really like to see all of those
> requirements go from *Java SDK* to *any SDK*. It doesn't need to happen
> right away, but it would be great to avoid going down a path that's not
> amenable to such an approach in the future. So it's worth thinking about
> what the "any SDK" approach will look like now. (Presumably others feel
> this way as well, thus Option 2).
> >>> >>>
> >>> >>> Thinking about a portable version, we can break down these
> requirements further:
> >>> >>> - To support pushdown into *any SDK* sources we need
> >>> >>>   a) a portable way to define (or discover) what projections a
> source is capable of
> >>> >>>   b) a portable way to modify the source to perform those
> projections
> >>> >>> - To support tracking field accesses in *any SDK* transforms we
> need
> >>> >>>   c) a portable way to annotate field accesses (this isn't so bad,
> we know how to annotate transforms, but the devil is in the details I
> suppose).
> >>> >>> - To support pushdown when building a pipeline with *any SDK* we
> need
> >>> >>>   d) some SDK-agnostic code for applying the pushdown
> >>> >>>
> >>> >>> I think (a,b,c) are solvable, but they depend on (d), which is a
> big unknown (to me). It looks like Option 2 pursues putting this pushdown
> code in runners, and having them operate on the Runner API protos directly.
> I have a few concerns with this:
> >>> >>> - It precludes having complex logic for (a,b) in the SDKs. This
> was touched on in your document already [1]. I don't think it's that
> problematic for projection, but as we look forward to predicate pushdown I
> think we'll need to allow complex logic in the SDK for discovering
> supported predicates (e.g. see the logic for predicate discovery in
> BigQuery [2]).
> >>> >>> - Will individual runners be responsible for implementing this? We
> could put the logic in runners-core-construction-java to re-use it across
> JVM-based runners, but what about others? Does this just move the code
> duplication from the SDK side to the runner side?
> >>> >>> - Finally, Option 1 is not really a stepping stone to this
> approach.
> >>> >>>
> >>> >>> An alternative portable approach might be to have an intermediate
> cross-SDK and cross-runner "optimizer", which (optionally) runs between the
> SDK and the runner. It could send expansion requests back to SDKs to
> "re-expand" sources with projections (and later predicates) applied. This
> would address all of the above:
> >>> >>> - Since we make expansion requests to apply projections, the SDKs
> can implement complex logic for (a,b).
> >>> >>> - The pushdown code isn't in a runner or an SDK, we'd only need
> one optimizer implementation.
> >>> >>> - Option 1 could be a stepping stone to this approach. The
> optimizer could be implemented with code from Option 1, if it's written
> with this in mind.
> >>> >>>
> >>> >>> This does have some downsides:
> >>> >>> - Sources are expanded twice. IO authors will need to avoid side
> effects in expansion methods, and any expensive logic will be executed
> twice.
> >>> >>> - It adds a Java dependency for non-Java SDKs (It is cleanly
> separable though: if you don't want pushdown you don't need Java).
> >>> >>>
> >>> >>> I'm curious what others think about this approach. Is it tenable?
> Am I missing something?
> >>> >>>
> >>> >>> Thanks!
> >>> >>> Brian
> >>> >>>
> >>> >>> [1]
> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
> >>> >>> [2]
> https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
> >>> >>>
> >>> >>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com>
> wrote:
> >>> >>>>
> >>> >>>> Thanks, I made a pass over Option 2
> >>> >>>>
> >>> >>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <kc...@google.com>
> wrote:
> >>> >>>>>
> >>> >>>>> Hi again everyone, thanks for all of your feedback. I've almost
> completely rewritten the document to include another design option and
> address more possibilities and challenges. Please take a look.
> >>> >>>>>
> >>> >>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <xi...@gmail.com>
> wrote:
> >>> >>>>>>
> >>> >>>>>> Very happy to see we will have pushdown optimizations for java
> pipelines! Thanks for sharing the proposal.
> >>> >>>>>>
> >>> >>>>>> Thanks,
> >>> >>>>>> XInyu
> >>> >>>>>>
> >>> >>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
> >>> >>>>>>>
> >>> >>>>>>> Thanks Kyle, very promising. I left some comments.
> >>> >>>>>>>
> >>> >>>>>>> —
> >>> >>>>>>> Alexey
> >>> >>>>>>>
> >>> >>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com> wrote:
> >>> >>>>>>>
> >>> >>>>>>> Thanks, I took a look at it.
> >>> >>>>>>>
> >>> >>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <
> kcweaver@google.com> wrote:
> >>> >>>>>>>>
> >>> >>>>>>>> Hi Beam devs,
> >>> >>>>>>>>
> >>> >>>>>>>> I'm back with another proposal involving Schema-based
> optimization in the Beam Java SDK. This one builds on the ideas in my
> previous proposal and is broader in scope. Please leave comments if this
> area is of interest to you.
> >>> >>>>>>>>
> >>> >>>>>>>>
> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
> >>> >>>>>>>>
> >>> >>>>>>>> Thank you,
> >>> >>>>>>>> Kyle
> >>> >>>>>>>
> >>> >>>>>>>
>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Robert Bradshaw <ro...@google.com>.
 On Thu, Sep 16, 2021 at 3:45 PM Kyle Weaver <kc...@google.com> wrote:
>>
>> I think this
>> came up earlier, but our discussion could be informed by concrete
>> examples of 4-5 sources that support pushdown and provide sufficient
>> diversity.
>
> I looked at the four IOs that support pushdown in BeamSqlTable: BigQuery, BigTable, MongoDB, and Parquet. Of the four, only ParquetIO expands into a simple ParDo(DoFn).

Interesting. In Python ParquetIO is the complicated one that expands
into a Read.from(BoundedSource) followed by a DoFn.

> The other three all expand into Read.from(BoundedSource).

This is the nice, trivial case where there is no composite structure
to worry about. Do we have any examples where we'd need additional
complexity to be able to expand composites differently?

> This introduces yet another layer of indirection. BoundedSource is where we must receive the pushdown request, but the BoundedSourceAsSDFWrapperFn is the actual DoFn, so we would need to pass pushdown information from BoundedSourceAsSDFWrapperFn to BoundedSource.

I don't think this extra level of indirection is a problem. It's what
one would expect of a shim whose purpose is to wrap one API in
another, and should be quite straightforward.

> This is a symptom of a larger problem: option 3 requires plumbing pushdown information through numerous layers of the Beam stack, from the protos to the SDK worker to the SDK’s DoFn implementation, and finally all the way through each IO implementation. In contrast, option 1 is entirely self-contained. It requires one method for pipeline optimization, and one trivial withProjectionPushdown method per IO.

It's true that an option scoped to a single language/SDK can be more
self-contained, but the whole point is that we don't want to be so
self-contained. Anything that operates at a language-agnostic level
requires plumbing between the user code and the protos, but this is
O(1) work and you're done (<200 lines for Python) and are back to just
writing a projection method per IO (or, at most, per step in the IO,
though it looks like that's usually 1 per the four sources mentioned
above).


> Whether option 1 is a stepping stone to the portable/cross-language solution remains to be seen; it depends on the portable/cross-language solution we end up choosing. But it's the simplest solution to the problem I set out to solve.
>
>
>
> On Thu, Sep 16, 2021 at 12:59 PM Brian Hulette <bh...@google.com> wrote:
>>>
>>> This would be a little bit like the environment
>>> (though it would also get attached to composites); maybe a reference
>>> to docker image that could get started up and communicated with?
>>
>>
>> I was thinking along similar lines. The idea I had was that composites from the non-submitting SDK could be annotated with the expansion service that they came from, and we'd have a "loopback" service for the submitting SDK transforms. But that would require that the expansion services are still running, or we include enough information to start them up again.
>>
>>> Note that this does not obviate the need to come up with some
>>> language-agnostic way of defining the projections/predicates one wants
>>> to perform, but it does let one indicate whether one supports a given
>>> projection in code rather than having to come up with a language for
>>> that as well. The primary advantage is that it allows one to act on
>>> composites rather than at the level of primitives which is nice.
>>> Whether this is necessary depends on how often full expansions (rather
>>> than just the primitives) differ when a projection/predicate is pushed
>>> down, and I have yet to come up with an example of that. (I think this
>>> came up earlier, but our discussion could be informed by concrete
>>> examples of 4-5 sources that support pushdown and provide sufficient
>>> diversity.)
>>
>>
>> Thanks, this articulates the advantages well. I don't think we can avoid having a language-agnostic way to represent projections/predicates (there may be something open source we can leverage or at least model here, e.g. I'm keeping an eye on Substrait [1,2]).
>> I think you're probably right that it's rare a source with a pushdown applied will need a different set of primitives. So the biggest advantage of Option 4 is that it lets you articulate supported pushdowns in code. If we create an entirely new "pushdown" service, maybe it could just handle that instead of doing the full expansion.
>>
>> [1] https://substrait.io/
>> [2] https://lists.apache.org/thread.html/r6112e3c28cd6759867c3bc37b546c0d8183f9431c6dbe234d3af018f%40%3Cdev.calcite.apache.org%3E
>>
>> On Wed, Sep 15, 2021 at 6:10 PM Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> The re-expansion idea is an interesting one. Let's call this Option 4.
>>> The main question is how it would be invoked.
>>>
>>> As well as adding complexity, I don't think it would work to do it as
>>> part of the SDK's submission to the runner. For one thing, it'd be
>>> best if we could just fire-and-forget, even writing the spec to disk
>>> and then having the runner pick it up later, and we'd have to figure
>>> out how to make this work with interactive and similar services that
>>> act as runners but are really proxies for other runners. The other
>>> problem is that the submitting SDK may not be the SDK that generated
>>> the transform.
>>>
>>> The other option would be to have some kind of a "pushdown service"
>>> that would be referenced on the transform and could be invoked to do
>>> the re-expansion. This would be a little bit like the environment
>>> (though it would also get attached to composites); maybe a reference
>>> to docker image that could get started up and communicated with?
>>> (Maybe the SDK image itself could be used with an alternative
>>> entrypoint? Assuming docker-on-docker works well...) Might get a bit
>>> heavy-weight, but maybe we make it optional (though I'd rather it be
>>> something you get by default).
>>>
>>> Definitely worth expanding more.
>>>
>>>
>>>
>>> My primary concern with Option 1 has been that it does not seem a
>>> stepping stone for anything portable/multi-language, and as we'll
>>> definitely want to support that (IMHO sooner rather than later) I
>>> think whatever we build should go in that direction.
>>>
>>> Note that this does not obviate the need to come up with some
>>> language-agnostic way of defining the projections/predicates one wants
>>> to perform, but it does let one indicate whether one supports a given
>>> projection in code rather than having to come up with a language for
>>> that as well. The primary advantage is that it allows one to act on
>>> composites rather than at the level of primitives which is nice.
>>> Whether this is necessary depends on how often full expansions (rather
>>> than just the primitives) differ when a projection/predicate is pushed
>>> down, and I have yet to come up with an example of that. (I think this
>>> came up earlier, but our discussion could be informed by concrete
>>> examples of 4-5 sources that support pushdown and provide sufficient
>>> diversity.)
>>>
>>> I do think Option 3 could be a stepping stone to Option 4, viewing it
>>> as a mechanism for "simple" pushdowns to be done inline rather than a
>>> via remote execution.
>>>
>>> - Robert
>>>
>>>
>>> On Wed, Sep 15, 2021 at 1:59 PM Kyle Weaver <kc...@google.com> wrote:
>>> >
>>> > Oops, I accidentally hit the send button...
>>> >
>>> > As I was saying, I think that's an excellent idea. I discussed this with Andrew previously and we came to a similar conclusion, that re-expansion is more flexible than annotations, especially when it comes to predicate pushdown.
>>> >
>>> > Robert made a prototype [1] of Option 2 in the Python SDK that should answer some of your questions about that option. Specifically, for your requirement d), the SDK worker could apply the pushdown before processing. Since SDK workers are shared across runners, that would mean the individual runners themselves would not have to re-implement pushdown. This approach has other drawbacks, though, which I have enumerated in my design doc.
>>> >
>>> > [1] https://github.com/apache/beam/pull/15351
>>> >
>>> > On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <kc...@google.com> wrote:
>>> >>
>>> >> Hi Brian. I think that's an excele
>>> >>
>>> >> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <bh...@google.com> wrote:
>>> >>>
>>> >>> Thanks for the proposal Kyle! I wanted to write up some thoughts around the portable proposal (Option 2), and it grew well beyond the size of a doc comment.
>>> >>>
>>> >>> The basic requirements for this work, which are satisfied by Option 1, are:
>>> >>> - to allow pushdown into *Java SDK* sources
>>> >>> - based on field accesses in *Java SDK* transforms
>>> >>> - when building a pipeline with the *Java SDK*.
>>> >>>
>>> >>> However down the road I would really like to see all of those requirements go from *Java SDK* to *any SDK*. It doesn't need to happen right away, but it would be great to avoid going down a path that's not amenable to such an approach in the future. So it's worth thinking about what the "any SDK" approach will look like now. (Presumably others feel this way as well, thus Option 2).
>>> >>>
>>> >>> Thinking about a portable version, we can break down these requirements further:
>>> >>> - To support pushdown into *any SDK* sources we need
>>> >>>   a) a portable way to define (or discover) what projections a source is capable of
>>> >>>   b) a portable way to modify the source to perform those projections
>>> >>> - To support tracking field accesses in *any SDK* transforms we need
>>> >>>   c) a portable way to annotate field accesses (this isn't so bad, we know how to annotate transforms, but the devil is in the details I suppose).
>>> >>> - To support pushdown when building a pipeline with *any SDK* we need
>>> >>>   d) some SDK-agnostic code for applying the pushdown
>>> >>>
>>> >>> I think (a,b,c) are solvable, but they depend on (d), which is a big unknown (to me). It looks like Option 2 pursues putting this pushdown code in runners, and having them operate on the Runner API protos directly. I have a few concerns with this:
>>> >>> - It precludes having complex logic for (a,b) in the SDKs. This was touched on in your document already [1]. I don't think it's that problematic for projection, but as we look forward to predicate pushdown I think we'll need to allow complex logic in the SDK for discovering supported predicates (e.g. see the logic for predicate discovery in BigQuery [2]).
>>> >>> - Will individual runners be responsible for implementing this? We could put the logic in runners-core-construction-java to re-use it across JVM-based runners, but what about others? Does this just move the code duplication from the SDK side to the runner side?
>>> >>> - Finally, Option 1 is not really a stepping stone to this approach.
>>> >>>
>>> >>> An alternative portable approach might be to have an intermediate cross-SDK and cross-runner "optimizer", which (optionally) runs between the SDK and the runner. It could send expansion requests back to SDKs to "re-expand" sources with projections (and later predicates) applied. This would address all of the above:
>>> >>> - Since we make expansion requests to apply projections, the SDKs can implement complex logic for (a,b).
>>> >>> - The pushdown code isn't in a runner or an SDK, we'd only need one optimizer implementation.
>>> >>> - Option 1 could be a stepping stone to this approach. The optimizer could be implemented with code from Option 1, if it's written with this in mind.
>>> >>>
>>> >>> This does have some downsides:
>>> >>> - Sources are expanded twice. IO authors will need to avoid side effects in expansion methods, and any expensive logic will be executed twice.
>>> >>> - It adds a Java dependency for non-Java SDKs (It is cleanly separable though: if you don't want pushdown you don't need Java).
>>> >>>
>>> >>> I'm curious what others think about this approach. Is it tenable? Am I missing something?
>>> >>>
>>> >>> Thanks!
>>> >>> Brian
>>> >>>
>>> >>> [1] https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
>>> >>> [2] https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
>>> >>>
>>> >>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com> wrote:
>>> >>>>
>>> >>>> Thanks, I made a pass over Option 2
>>> >>>>
>>> >>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <kc...@google.com> wrote:
>>> >>>>>
>>> >>>>> Hi again everyone, thanks for all of your feedback. I've almost completely rewritten the document to include another design option and address more possibilities and challenges. Please take a look.
>>> >>>>>
>>> >>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <xi...@gmail.com> wrote:
>>> >>>>>>
>>> >>>>>> Very happy to see we will have pushdown optimizations for java pipelines! Thanks for sharing the proposal.
>>> >>>>>>
>>> >>>>>> Thanks,
>>> >>>>>> XInyu
>>> >>>>>>
>>> >>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <ar...@gmail.com> wrote:
>>> >>>>>>>
>>> >>>>>>> Thanks Kyle, very promising. I left some comments.
>>> >>>>>>>
>>> >>>>>>> —
>>> >>>>>>> Alexey
>>> >>>>>>>
>>> >>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com> wrote:
>>> >>>>>>>
>>> >>>>>>> Thanks, I took a look at it.
>>> >>>>>>>
>>> >>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <kc...@google.com> wrote:
>>> >>>>>>>>
>>> >>>>>>>> Hi Beam devs,
>>> >>>>>>>>
>>> >>>>>>>> I'm back with another proposal involving Schema-based optimization in the Beam Java SDK. This one builds on the ideas in my previous proposal and is broader in scope. Please leave comments if this area is of interest to you.
>>> >>>>>>>>
>>> >>>>>>>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>>> >>>>>>>>
>>> >>>>>>>> Thank you,
>>> >>>>>>>> Kyle
>>> >>>>>>>
>>> >>>>>>>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Kyle Weaver <kc...@google.com>.
>
> I think this
> came up earlier, but our discussion could be informed by concrete
> examples of 4-5 sources that support pushdown and provide sufficient
> diversity.
>

I looked at the four IOs that support pushdown in BeamSqlTable: BigQuery,
BigTable, MongoDB, and Parquet. Of the four, only ParquetIO expands into a
simple ParDo(DoFn). The other three all expand into
Read.from(BoundedSource). This introduces yet another layer of indirection.
BoundedSource is where we must receive the pushdown request, but the
BoundedSourceAsSDFWrapperFn is the actual DoFn, so we would need to pass
pushdown information from BoundedSourceAsSDFWrapperFn to BoundedSource.

This is a symptom of a larger problem: option 3 requires plumbing pushdown
information through numerous layers of the Beam stack, from the protos to
the SDK worker to the SDK’s DoFn implementation, and finally all the way
through each IO implementation. In contrast, option 1 is entirely
self-contained. It requires one method for pipeline optimization, and one
trivial withProjectionPushdown method per IO.


Whether option 1 is a stepping stone to the portable/cross-language
solution remains to be seen; it depends on the portable/cross-language
solution we end up choosing. But it's the simplest solution to the problem
I set out to solve.


On Thu, Sep 16, 2021 at 12:59 PM Brian Hulette <bh...@google.com> wrote:

> This would be a little bit like the environment
>> (though it would also get attached to composites); maybe a reference
>> to docker image that could get started up and communicated with?
>
>
> I was thinking along similar lines. The idea I had was that composites
> from the non-submitting SDK could be annotated with the expansion service
> that they came from, and we'd have a "loopback" service for the submitting
> SDK transforms. But that would require that the expansion services are
> still running, or we include enough information to start them up again.
>
> Note that this does not obviate the need to come up with some
>> language-agnostic way of defining the projections/predicates one wants
>> to perform, but it does let one indicate whether one supports a given
>> projection in code rather than having to come up with a language for
>> that as well. The primary advantage is that it allows one to act on
>> composites rather than at the level of primitives which is nice.
>> Whether this is necessary depends on how often full expansions (rather
>> than just the primitives) differ when a projection/predicate is pushed
>> down, and I have yet to come up with an example of that. (I think this
>> came up earlier, but our discussion could be informed by concrete
>> examples of 4-5 sources that support pushdown and provide sufficient
>> diversity.)
>
>
> Thanks, this articulates the advantages well. I don't think we can avoid
> having a language-agnostic way to represent projections/predicates (there
> may be something open source we can leverage or at least model here, e.g.
> I'm keeping an eye on Substrait [1,2]).
> I think you're probably right that it's rare a source with a pushdown
> applied will need a different set of primitives. So the biggest advantage
> of Option 4 is that it lets you articulate supported pushdowns in code. If
> we create an entirely new "pushdown" service, maybe it could just handle
> that instead of doing the full expansion.
>
> [1] https://substrait.io/
> [2]
> https://lists.apache.org/thread.html/r6112e3c28cd6759867c3bc37b546c0d8183f9431c6dbe234d3af018f%40%3Cdev.calcite.apache.org%3E
>
> On Wed, Sep 15, 2021 at 6:10 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> The re-expansion idea is an interesting one. Let's call this Option 4.
>> The main question is how it would be invoked.
>>
>> As well as adding complexity, I don't think it would work to do it as
>> part of the SDK's submission to the runner. For one thing, it'd be
>> best if we could just fire-and-forget, even writing the spec to disk
>> and then having the runner pick it up later, and we'd have to figure
>> out how to make this work with interactive and similar services that
>> act as runners but are really proxies for other runners. The other
>> problem is that the submitting SDK may not be the SDK that generated
>> the transform.
>>
>> The other option would be to have some kind of a "pushdown service"
>> that would be referenced on the transform and could be invoked to do
>> the re-expansion. This would be a little bit like the environment
>> (though it would also get attached to composites); maybe a reference
>> to docker image that could get started up and communicated with?
>> (Maybe the SDK image itself could be used with an alternative
>> entrypoint? Assuming docker-on-docker works well...) Might get a bit
>> heavy-weight, but maybe we make it optional (though I'd rather it be
>> something you get by default).
>>
>> Definitely worth expanding more.
>
>
>>
>> My primary concern with Option 1 has been that it does not seem a
>> stepping stone for anything portable/multi-language, and as we'll
>> definitely want to support that (IMHO sooner rather than later) I
>> think whatever we build should go in that direction.
>>
>> Note that this does not obviate the need to come up with some
>> language-agnostic way of defining the projections/predicates one wants
>> to perform, but it does let one indicate whether one supports a given
>> projection in code rather than having to come up with a language for
>> that as well. The primary advantage is that it allows one to act on
>> composites rather than at the level of primitives which is nice.
>> Whether this is necessary depends on how often full expansions (rather
>> than just the primitives) differ when a projection/predicate is pushed
>> down, and I have yet to come up with an example of that. (I think this
>> came up earlier, but our discussion could be informed by concrete
>> examples of 4-5 sources that support pushdown and provide sufficient
>> diversity.)
>>
>> I do think Option 3 could be a stepping stone to Option 4, viewing it
>> as a mechanism for "simple" pushdowns to be done inline rather than a
>> via remote execution.
>>
>> - Robert
>>
>>
>> On Wed, Sep 15, 2021 at 1:59 PM Kyle Weaver <kc...@google.com> wrote:
>> >
>> > Oops, I accidentally hit the send button...
>> >
>> > As I was saying, I think that's an excellent idea. I discussed this
>> with Andrew previously and we came to a similar conclusion, that
>> re-expansion is more flexible than annotations, especially when it comes to
>> predicate pushdown.
>> >
>> > Robert made a prototype [1] of Option 2 in the Python SDK that should
>> answer some of your questions about that option. Specifically, for your
>> requirement d), the SDK worker could apply the pushdown before processing.
>> Since SDK workers are shared across runners, that would mean the individual
>> runners themselves would not have to re-implement pushdown. This approach
>> has other drawbacks, though, which I have enumerated in my design doc.
>> >
>> > [1] https://github.com/apache/beam/pull/15351
>> >
>> > On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <kc...@google.com>
>> wrote:
>> >>
>> >> Hi Brian. I think that's an excele
>> >>
>> >> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <bh...@google.com>
>> wrote:
>> >>>
>> >>> Thanks for the proposal Kyle! I wanted to write up some thoughts
>> around the portable proposal (Option 2), and it grew well beyond the size
>> of a doc comment.
>> >>>
>> >>> The basic requirements for this work, which are satisfied by Option
>> 1, are:
>> >>> - to allow pushdown into *Java SDK* sources
>> >>> - based on field accesses in *Java SDK* transforms
>> >>> - when building a pipeline with the *Java SDK*.
>> >>>
>> >>> However down the road I would really like to see all of those
>> requirements go from *Java SDK* to *any SDK*. It doesn't need to happen
>> right away, but it would be great to avoid going down a path that's not
>> amenable to such an approach in the future. So it's worth thinking about
>> what the "any SDK" approach will look like now. (Presumably others feel
>> this way as well, thus Option 2).
>> >>>
>> >>> Thinking about a portable version, we can break down these
>> requirements further:
>> >>> - To support pushdown into *any SDK* sources we need
>> >>>   a) a portable way to define (or discover) what projections a source
>> is capable of
>> >>>   b) a portable way to modify the source to perform those projections
>> >>> - To support tracking field accesses in *any SDK* transforms we need
>> >>>   c) a portable way to annotate field accesses (this isn't so bad, we
>> know how to annotate transforms, but the devil is in the details I suppose).
>> >>> - To support pushdown when building a pipeline with *any SDK* we need
>> >>>   d) some SDK-agnostic code for applying the pushdown
>> >>>
>> >>> I think (a,b,c) are solvable, but they depend on (d), which is a big
>> unknown (to me). It looks like Option 2 pursues putting this pushdown code
>> in runners, and having them operate on the Runner API protos directly. I
>> have a few concerns with this:
>> >>> - It precludes having complex logic for (a,b) in the SDKs. This was
>> touched on in your document already [1]. I don't think it's that
>> problematic for projection, but as we look forward to predicate pushdown I
>> think we'll need to allow complex logic in the SDK for discovering
>> supported predicates (e.g. see the logic for predicate discovery in
>> BigQuery [2]).
>> >>> - Will individual runners be responsible for implementing this? We
>> could put the logic in runners-core-construction-java to re-use it across
>> JVM-based runners, but what about others? Does this just move the code
>> duplication from the SDK side to the runner side?
>> >>> - Finally, Option 1 is not really a stepping stone to this approach.
>> >>>
>> >>> An alternative portable approach might be to have an intermediate
>> cross-SDK and cross-runner "optimizer", which (optionally) runs between the
>> SDK and the runner. It could send expansion requests back to SDKs to
>> "re-expand" sources with projections (and later predicates) applied. This
>> would address all of the above:
>> >>> - Since we make expansion requests to apply projections, the SDKs can
>> implement complex logic for (a,b).
>> >>> - The pushdown code isn't in a runner or an SDK, we'd only need one
>> optimizer implementation.
>> >>> - Option 1 could be a stepping stone to this approach. The optimizer
>> could be implemented with code from Option 1, if it's written with this in
>> mind.
>> >>>
>> >>> This does have some downsides:
>> >>> - Sources are expanded twice. IO authors will need to avoid side
>> effects in expansion methods, and any expensive logic will be executed
>> twice.
>> >>> - It adds a Java dependency for non-Java SDKs (It is cleanly
>> separable though: if you don't want pushdown you don't need Java).
>> >>>
>> >>> I'm curious what others think about this approach. Is it tenable? Am
>> I missing something?
>> >>>
>> >>> Thanks!
>> >>> Brian
>> >>>
>> >>> [1]
>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
>> >>> [2]
>> https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
>> >>>
>> >>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com> wrote:
>> >>>>
>> >>>> Thanks, I made a pass over Option 2
>> >>>>
>> >>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <kc...@google.com>
>> wrote:
>> >>>>>
>> >>>>> Hi again everyone, thanks for all of your feedback. I've almost
>> completely rewritten the document to include another design option and
>> address more possibilities and challenges. Please take a look.
>> >>>>>
>> >>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <xi...@gmail.com>
>> wrote:
>> >>>>>>
>> >>>>>> Very happy to see we will have pushdown optimizations for java
>> pipelines! Thanks for sharing the proposal.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> XInyu
>> >>>>>>
>> >>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>> >>>>>>>
>> >>>>>>> Thanks Kyle, very promising. I left some comments.
>> >>>>>>>
>> >>>>>>> —
>> >>>>>>> Alexey
>> >>>>>>>
>> >>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com> wrote:
>> >>>>>>>
>> >>>>>>> Thanks, I took a look at it.
>> >>>>>>>
>> >>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <kc...@google.com>
>> wrote:
>> >>>>>>>>
>> >>>>>>>> Hi Beam devs,
>> >>>>>>>>
>> >>>>>>>> I'm back with another proposal involving Schema-based
>> optimization in the Beam Java SDK. This one builds on the ideas in my
>> previous proposal and is broader in scope. Please leave comments if this
>> area is of interest to you.
>> >>>>>>>>
>> >>>>>>>>
>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>> >>>>>>>>
>> >>>>>>>> Thank you,
>> >>>>>>>> Kyle
>> >>>>>>>
>> >>>>>>>
>>
>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Brian Hulette <bh...@google.com>.
>
> This would be a little bit like the environment
> (though it would also get attached to composites); maybe a reference
> to docker image that could get started up and communicated with?


I was thinking along similar lines. The idea I had was that composites from
the non-submitting SDK could be annotated with the expansion service that
they came from, and we'd have a "loopback" service for the submitting SDK
transforms. But that would require that the expansion services are still
running, or we include enough information to start them up again.

Note that this does not obviate the need to come up with some
> language-agnostic way of defining the projections/predicates one wants
> to perform, but it does let one indicate whether one supports a given
> projection in code rather than having to come up with a language for
> that as well. The primary advantage is that it allows one to act on
> composites rather than at the level of primitives which is nice.
> Whether this is necessary depends on how often full expansions (rather
> than just the primitives) differ when a projection/predicate is pushed
> down, and I have yet to come up with an example of that. (I think this
> came up earlier, but our discussion could be informed by concrete
> examples of 4-5 sources that support pushdown and provide sufficient
> diversity.)


Thanks, this articulates the advantages well. I don't think we can avoid
having a language-agnostic way to represent projections/predicates (there
may be something open source we can leverage or at least model here, e.g.
I'm keeping an eye on Substrait [1,2]).
I think you're probably right that it's rare a source with a pushdown
applied will need a different set of primitives. So the biggest advantage
of Option 4 is that it lets you articulate supported pushdowns in code. If
we create an entirely new "pushdown" service, maybe it could just handle
that instead of doing the full expansion.

[1] https://substrait.io/
[2]
https://lists.apache.org/thread.html/r6112e3c28cd6759867c3bc37b546c0d8183f9431c6dbe234d3af018f%40%3Cdev.calcite.apache.org%3E

On Wed, Sep 15, 2021 at 6:10 PM Robert Bradshaw <ro...@google.com> wrote:

> The re-expansion idea is an interesting one. Let's call this Option 4.
> The main question is how it would be invoked.
>
> As well as adding complexity, I don't think it would work to do it as
> part of the SDK's submission to the runner. For one thing, it'd be
> best if we could just fire-and-forget, even writing the spec to disk
> and then having the runner pick it up later, and we'd have to figure
> out how to make this work with interactive and similar services that
> act as runners but are really proxies for other runners. The other
> problem is that the submitting SDK may not be the SDK that generated
> the transform.
>
> The other option would be to have some kind of a "pushdown service"
> that would be referenced on the transform and could be invoked to do
> the re-expansion. This would be a little bit like the environment
> (though it would also get attached to composites); maybe a reference
> to docker image that could get started up and communicated with?
> (Maybe the SDK image itself could be used with an alternative
> entrypoint? Assuming docker-on-docker works well...) Might get a bit
> heavy-weight, but maybe we make it optional (though I'd rather it be
> something you get by default).
>
> Definitely worth expanding more.


>
> My primary concern with Option 1 has been that it does not seem a
> stepping stone for anything portable/multi-language, and as we'll
> definitely want to support that (IMHO sooner rather than later) I
> think whatever we build should go in that direction.
>
> Note that this does not obviate the need to come up with some
> language-agnostic way of defining the projections/predicates one wants
> to perform, but it does let one indicate whether one supports a given
> projection in code rather than having to come up with a language for
> that as well. The primary advantage is that it allows one to act on
> composites rather than at the level of primitives which is nice.
> Whether this is necessary depends on how often full expansions (rather
> than just the primitives) differ when a projection/predicate is pushed
> down, and I have yet to come up with an example of that. (I think this
> came up earlier, but our discussion could be informed by concrete
> examples of 4-5 sources that support pushdown and provide sufficient
> diversity.)
>
> I do think Option 3 could be a stepping stone to Option 4, viewing it
> as a mechanism for "simple" pushdowns to be done inline rather than a
> via remote execution.
>
> - Robert
>
>
> On Wed, Sep 15, 2021 at 1:59 PM Kyle Weaver <kc...@google.com> wrote:
> >
> > Oops, I accidentally hit the send button...
> >
> > As I was saying, I think that's an excellent idea. I discussed this with
> Andrew previously and we came to a similar conclusion, that re-expansion is
> more flexible than annotations, especially when it comes to predicate
> pushdown.
> >
> > Robert made a prototype [1] of Option 2 in the Python SDK that should
> answer some of your questions about that option. Specifically, for your
> requirement d), the SDK worker could apply the pushdown before processing.
> Since SDK workers are shared across runners, that would mean the individual
> runners themselves would not have to re-implement pushdown. This approach
> has other drawbacks, though, which I have enumerated in my design doc.
> >
> > [1] https://github.com/apache/beam/pull/15351
> >
> > On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <kc...@google.com> wrote:
> >>
> >> Hi Brian. I think that's an excele
> >>
> >> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <bh...@google.com>
> wrote:
> >>>
> >>> Thanks for the proposal Kyle! I wanted to write up some thoughts
> around the portable proposal (Option 2), and it grew well beyond the size
> of a doc comment.
> >>>
> >>> The basic requirements for this work, which are satisfied by Option 1,
> are:
> >>> - to allow pushdown into *Java SDK* sources
> >>> - based on field accesses in *Java SDK* transforms
> >>> - when building a pipeline with the *Java SDK*.
> >>>
> >>> However down the road I would really like to see all of those
> requirements go from *Java SDK* to *any SDK*. It doesn't need to happen
> right away, but it would be great to avoid going down a path that's not
> amenable to such an approach in the future. So it's worth thinking about
> what the "any SDK" approach will look like now. (Presumably others feel
> this way as well, thus Option 2).
> >>>
> >>> Thinking about a portable version, we can break down these
> requirements further:
> >>> - To support pushdown into *any SDK* sources we need
> >>>   a) a portable way to define (or discover) what projections a source
> is capable of
> >>>   b) a portable way to modify the source to perform those projections
> >>> - To support tracking field accesses in *any SDK* transforms we need
> >>>   c) a portable way to annotate field accesses (this isn't so bad, we
> know how to annotate transforms, but the devil is in the details I suppose).
> >>> - To support pushdown when building a pipeline with *any SDK* we need
> >>>   d) some SDK-agnostic code for applying the pushdown
> >>>
> >>> I think (a,b,c) are solvable, but they depend on (d), which is a big
> unknown (to me). It looks like Option 2 pursues putting this pushdown code
> in runners, and having them operate on the Runner API protos directly. I
> have a few concerns with this:
> >>> - It precludes having complex logic for (a,b) in the SDKs. This was
> touched on in your document already [1]. I don't think it's that
> problematic for projection, but as we look forward to predicate pushdown I
> think we'll need to allow complex logic in the SDK for discovering
> supported predicates (e.g. see the logic for predicate discovery in
> BigQuery [2]).
> >>> - Will individual runners be responsible for implementing this? We
> could put the logic in runners-core-construction-java to re-use it across
> JVM-based runners, but what about others? Does this just move the code
> duplication from the SDK side to the runner side?
> >>> - Finally, Option 1 is not really a stepping stone to this approach.
> >>>
> >>> An alternative portable approach might be to have an intermediate
> cross-SDK and cross-runner "optimizer", which (optionally) runs between the
> SDK and the runner. It could send expansion requests back to SDKs to
> "re-expand" sources with projections (and later predicates) applied. This
> would address all of the above:
> >>> - Since we make expansion requests to apply projections, the SDKs can
> implement complex logic for (a,b).
> >>> - The pushdown code isn't in a runner or an SDK, we'd only need one
> optimizer implementation.
> >>> - Option 1 could be a stepping stone to this approach. The optimizer
> could be implemented with code from Option 1, if it's written with this in
> mind.
> >>>
> >>> This does have some downsides:
> >>> - Sources are expanded twice. IO authors will need to avoid side
> effects in expansion methods, and any expensive logic will be executed
> twice.
> >>> - It adds a Java dependency for non-Java SDKs (It is cleanly separable
> though: if you don't want pushdown you don't need Java).
> >>>
> >>> I'm curious what others think about this approach. Is it tenable? Am I
> missing something?
> >>>
> >>> Thanks!
> >>> Brian
> >>>
> >>> [1]
> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
> >>> [2]
> https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
> >>>
> >>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com> wrote:
> >>>>
> >>>> Thanks, I made a pass over Option 2
> >>>>
> >>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <kc...@google.com>
> wrote:
> >>>>>
> >>>>> Hi again everyone, thanks for all of your feedback. I've almost
> completely rewritten the document to include another design option and
> address more possibilities and challenges. Please take a look.
> >>>>>
> >>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <xi...@gmail.com>
> wrote:
> >>>>>>
> >>>>>> Very happy to see we will have pushdown optimizations for java
> pipelines! Thanks for sharing the proposal.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> XInyu
> >>>>>>
> >>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
> >>>>>>>
> >>>>>>> Thanks Kyle, very promising. I left some comments.
> >>>>>>>
> >>>>>>> —
> >>>>>>> Alexey
> >>>>>>>
> >>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com> wrote:
> >>>>>>>
> >>>>>>> Thanks, I took a look at it.
> >>>>>>>
> >>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <kc...@google.com>
> wrote:
> >>>>>>>>
> >>>>>>>> Hi Beam devs,
> >>>>>>>>
> >>>>>>>> I'm back with another proposal involving Schema-based
> optimization in the Beam Java SDK. This one builds on the ideas in my
> previous proposal and is broader in scope. Please leave comments if this
> area is of interest to you.
> >>>>>>>>
> >>>>>>>>
> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
> >>>>>>>>
> >>>>>>>> Thank you,
> >>>>>>>> Kyle
> >>>>>>>
> >>>>>>>
>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Robert Bradshaw <ro...@google.com>.
The re-expansion idea is an interesting one. Let's call this Option 4.
The main question is how it would be invoked.

As well as adding complexity, I don't think it would work to do it as
part of the SDK's submission to the runner. For one thing, it'd be
best if we could just fire-and-forget, even writing the spec to disk
and then having the runner pick it up later, and we'd have to figure
out how to make this work with interactive and similar services that
act as runners but are really proxies for other runners. The other
problem is that the submitting SDK may not be the SDK that generated
the transform.

The other option would be to have some kind of a "pushdown service"
that would be referenced on the transform and could be invoked to do
the re-expansion. This would be a little bit like the environment
(though it would also get attached to composites); maybe a reference
to docker image that could get started up and communicated with?
(Maybe the SDK image itself could be used with an alternative
entrypoint? Assuming docker-on-docker works well...) Might get a bit
heavy-weight, but maybe we make it optional (though I'd rather it be
something you get by default).

Definitely worth expanding more.


My primary concern with Option 1 has been that it does not seem a
stepping stone for anything portable/multi-language, and as we'll
definitely want to support that (IMHO sooner rather than later) I
think whatever we build should go in that direction.

Note that this does not obviate the need to come up with some
language-agnostic way of defining the projections/predicates one wants
to perform, but it does let one indicate whether one supports a given
projection in code rather than having to come up with a language for
that as well. The primary advantage is that it allows one to act on
composites rather than at the level of primitives which is nice.
Whether this is necessary depends on how often full expansions (rather
than just the primitives) differ when a projection/predicate is pushed
down, and I have yet to come up with an example of that. (I think this
came up earlier, but our discussion could be informed by concrete
examples of 4-5 sources that support pushdown and provide sufficient
diversity.)

I do think Option 3 could be a stepping stone to Option 4, viewing it
as a mechanism for "simple" pushdowns to be done inline rather than a
via remote execution.

- Robert


On Wed, Sep 15, 2021 at 1:59 PM Kyle Weaver <kc...@google.com> wrote:
>
> Oops, I accidentally hit the send button...
>
> As I was saying, I think that's an excellent idea. I discussed this with Andrew previously and we came to a similar conclusion, that re-expansion is more flexible than annotations, especially when it comes to predicate pushdown.
>
> Robert made a prototype [1] of Option 2 in the Python SDK that should answer some of your questions about that option. Specifically, for your requirement d), the SDK worker could apply the pushdown before processing. Since SDK workers are shared across runners, that would mean the individual runners themselves would not have to re-implement pushdown. This approach has other drawbacks, though, which I have enumerated in my design doc.
>
> [1] https://github.com/apache/beam/pull/15351
>
> On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <kc...@google.com> wrote:
>>
>> Hi Brian. I think that's an excele
>>
>> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <bh...@google.com> wrote:
>>>
>>> Thanks for the proposal Kyle! I wanted to write up some thoughts around the portable proposal (Option 2), and it grew well beyond the size of a doc comment.
>>>
>>> The basic requirements for this work, which are satisfied by Option 1, are:
>>> - to allow pushdown into *Java SDK* sources
>>> - based on field accesses in *Java SDK* transforms
>>> - when building a pipeline with the *Java SDK*.
>>>
>>> However down the road I would really like to see all of those requirements go from *Java SDK* to *any SDK*. It doesn't need to happen right away, but it would be great to avoid going down a path that's not amenable to such an approach in the future. So it's worth thinking about what the "any SDK" approach will look like now. (Presumably others feel this way as well, thus Option 2).
>>>
>>> Thinking about a portable version, we can break down these requirements further:
>>> - To support pushdown into *any SDK* sources we need
>>>   a) a portable way to define (or discover) what projections a source is capable of
>>>   b) a portable way to modify the source to perform those projections
>>> - To support tracking field accesses in *any SDK* transforms we need
>>>   c) a portable way to annotate field accesses (this isn't so bad, we know how to annotate transforms, but the devil is in the details I suppose).
>>> - To support pushdown when building a pipeline with *any SDK* we need
>>>   d) some SDK-agnostic code for applying the pushdown
>>>
>>> I think (a,b,c) are solvable, but they depend on (d), which is a big unknown (to me). It looks like Option 2 pursues putting this pushdown code in runners, and having them operate on the Runner API protos directly. I have a few concerns with this:
>>> - It precludes having complex logic for (a,b) in the SDKs. This was touched on in your document already [1]. I don't think it's that problematic for projection, but as we look forward to predicate pushdown I think we'll need to allow complex logic in the SDK for discovering supported predicates (e.g. see the logic for predicate discovery in BigQuery [2]).
>>> - Will individual runners be responsible for implementing this? We could put the logic in runners-core-construction-java to re-use it across JVM-based runners, but what about others? Does this just move the code duplication from the SDK side to the runner side?
>>> - Finally, Option 1 is not really a stepping stone to this approach.
>>>
>>> An alternative portable approach might be to have an intermediate cross-SDK and cross-runner "optimizer", which (optionally) runs between the SDK and the runner. It could send expansion requests back to SDKs to "re-expand" sources with projections (and later predicates) applied. This would address all of the above:
>>> - Since we make expansion requests to apply projections, the SDKs can implement complex logic for (a,b).
>>> - The pushdown code isn't in a runner or an SDK, we'd only need one optimizer implementation.
>>> - Option 1 could be a stepping stone to this approach. The optimizer could be implemented with code from Option 1, if it's written with this in mind.
>>>
>>> This does have some downsides:
>>> - Sources are expanded twice. IO authors will need to avoid side effects in expansion methods, and any expensive logic will be executed twice.
>>> - It adds a Java dependency for non-Java SDKs (It is cleanly separable though: if you don't want pushdown you don't need Java).
>>>
>>> I'm curious what others think about this approach. Is it tenable? Am I missing something?
>>>
>>> Thanks!
>>> Brian
>>>
>>> [1] https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
>>> [2] https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
>>>
>>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>> Thanks, I made a pass over Option 2
>>>>
>>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <kc...@google.com> wrote:
>>>>>
>>>>> Hi again everyone, thanks for all of your feedback. I've almost completely rewritten the document to include another design option and address more possibilities and challenges. Please take a look.
>>>>>
>>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>>>>
>>>>>> Very happy to see we will have pushdown optimizations for java pipelines! Thanks for sharing the proposal.
>>>>>>
>>>>>> Thanks,
>>>>>> XInyu
>>>>>>
>>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <ar...@gmail.com> wrote:
>>>>>>>
>>>>>>> Thanks Kyle, very promising. I left some comments.
>>>>>>>
>>>>>>> —
>>>>>>> Alexey
>>>>>>>
>>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>> Thanks, I took a look at it.
>>>>>>>
>>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <kc...@google.com> wrote:
>>>>>>>>
>>>>>>>> Hi Beam devs,
>>>>>>>>
>>>>>>>> I'm back with another proposal involving Schema-based optimization in the Beam Java SDK. This one builds on the ideas in my previous proposal and is broader in scope. Please leave comments if this area is of interest to you.
>>>>>>>>
>>>>>>>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>>>>>>>>
>>>>>>>> Thank you,
>>>>>>>> Kyle
>>>>>>>
>>>>>>>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Kyle Weaver <kc...@google.com>.
Oops, I accidentally hit the send button...

As I was saying, I think that's an excellent idea. I discussed this with
Andrew previously and we came to a similar conclusion, that re-expansion is
more flexible than annotations, especially when it comes to predicate
pushdown.

Robert made a prototype [1] of Option 2 in the Python SDK that should
answer some of your questions about that option. Specifically, for your
requirement d), the SDK worker could apply the pushdown before processing.
Since SDK workers are shared across runners, that would mean the individual
runners themselves would not have to re-implement pushdown. This approach
has other drawbacks, though, which I have enumerated in my design doc.

[1] https://github.com/apache/beam/pull/15351

On Wed, Sep 15, 2021 at 1:44 PM Kyle Weaver <kc...@google.com> wrote:

> Hi Brian. I think that's an excele
>
> On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <bh...@google.com>
> wrote:
>
>> Thanks for the proposal Kyle! I wanted to write up some thoughts around
>> the portable proposal (Option 2), and it grew well beyond the size of a doc
>> comment.
>>
>> The basic requirements for this work, which are satisfied by Option 1,
>> are:
>> - to allow pushdown into *Java SDK* sources
>> - based on field accesses in *Java SDK* transforms
>> - when building a pipeline with the *Java SDK*.
>>
>> However down the road I would really like to see all of those
>> requirements go from *Java SDK* to *any SDK*. It doesn't need to happen
>> right away, but it would be great to avoid going down a path that's not
>> amenable to such an approach in the future. So it's worth thinking about
>> what the "any SDK" approach will look like now. (Presumably others feel
>> this way as well, thus Option 2).
>>
>> Thinking about a portable version, we can break down these requirements
>> further:
>> - To support pushdown into *any SDK* sources we need
>>   a) a portable way to define (or discover) what projections a source is
>> capable of
>>   b) a portable way to modify the source to perform those projections
>> - To support tracking field accesses in *any SDK* transforms we need
>>   c) a portable way to annotate field accesses (this isn't so bad, we
>> know how to annotate transforms, but the devil is in the details I suppose).
>> - To support pushdown when building a pipeline with *any SDK* we need
>>   d) some SDK-agnostic code for applying the pushdown
>>
>> I think (a,b,c) are solvable, but they depend on (d), which is a big
>> unknown (to me). It looks like Option 2 pursues putting this pushdown code
>> in runners, and having them operate on the Runner API protos directly. I
>> have a few concerns with this:
>> - It precludes having complex logic for (a,b) in the SDKs. This was
>> touched on in your document already [1]. I don't think it's that
>> problematic for projection, but as we look forward to predicate pushdown I
>> think we'll need to allow complex logic in the SDK for discovering
>> supported predicates (e.g. see the logic for predicate discovery in
>> BigQuery [2]).
>> - Will individual runners be responsible for implementing this? We could
>> put the logic in runners-core-construction-java to re-use it across
>> JVM-based runners, but what about others? Does this just move the code
>> duplication from the SDK side to the runner side?
>> - Finally, Option 1 is not really a stepping stone to this approach.
>>
>> An alternative portable approach might be to have an intermediate
>> cross-SDK and cross-runner "optimizer", which (optionally) runs between the
>> SDK and the runner. It could send expansion requests back to SDKs to
>> "re-expand" sources with projections (and later predicates) applied. This
>> would address all of the above:
>> - Since we make expansion requests to apply projections, the SDKs can
>> implement complex logic for (a,b).
>> - The pushdown code isn't in a runner or an SDK, we'd only need one
>> optimizer implementation.
>> - Option 1 could be a stepping stone to this approach. The optimizer
>> could be implemented with code from Option 1, if it's written with this in
>> mind.
>>
>> This does have some downsides:
>> - Sources are expanded twice. IO authors will need to avoid side effects
>> in expansion methods, and any expensive logic will be executed twice.
>> - It adds a Java dependency for non-Java SDKs (It is cleanly separable
>> though: if you don't want pushdown you don't need Java).
>>
>> I'm curious what others think about this approach. Is it tenable? Am I
>> missing something?
>>
>> Thanks!
>> Brian
>>
>> [1]
>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
>> [2]
>> https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
>>
>> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> Thanks, I made a pass over Option 2
>>>
>>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <kc...@google.com>
>>> wrote:
>>>
>>>> Hi again everyone, thanks for all of your feedback. I've almost
>>>> completely rewritten the document to include another design option and
>>>> address more possibilities and challenges. Please take a look.
>>>>
>>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <xi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Very happy to see we will have pushdown optimizations for java
>>>>> pipelines! Thanks for sharing the proposal.
>>>>>
>>>>> Thanks,
>>>>> XInyu
>>>>>
>>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>
>>>>>> Thanks Kyle, very promising. I left some comments.
>>>>>>
>>>>>> —
>>>>>> Alexey
>>>>>>
>>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>> Thanks, I took a look at it.
>>>>>>
>>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <kc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Beam devs,
>>>>>>>
>>>>>>> I'm back with another proposal involving Schema-based optimization
>>>>>>> in the Beam Java SDK. This one builds on the ideas in my previous proposal
>>>>>>> and is broader in scope. Please leave comments if this area is of interest
>>>>>>> to you.
>>>>>>>
>>>>>>>
>>>>>>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>>>>>>>
>>>>>>> Thank you,
>>>>>>> Kyle
>>>>>>>
>>>>>>
>>>>>>

Re: [PROPOSAL] Projection pushdown in Beam Java

Posted by Kyle Weaver <kc...@google.com>.
Hi Brian. I think that's an excele

On Wed, Sep 15, 2021 at 11:10 AM Brian Hulette <bh...@google.com> wrote:

> Thanks for the proposal Kyle! I wanted to write up some thoughts around
> the portable proposal (Option 2), and it grew well beyond the size of a doc
> comment.
>
> The basic requirements for this work, which are satisfied by Option 1, are:
> - to allow pushdown into *Java SDK* sources
> - based on field accesses in *Java SDK* transforms
> - when building a pipeline with the *Java SDK*.
>
> However down the road I would really like to see all of those requirements
> go from *Java SDK* to *any SDK*. It doesn't need to happen right away, but
> it would be great to avoid going down a path that's not amenable to such an
> approach in the future. So it's worth thinking about what the "any SDK"
> approach will look like now. (Presumably others feel this way as well, thus
> Option 2).
>
> Thinking about a portable version, we can break down these requirements
> further:
> - To support pushdown into *any SDK* sources we need
>   a) a portable way to define (or discover) what projections a source is
> capable of
>   b) a portable way to modify the source to perform those projections
> - To support tracking field accesses in *any SDK* transforms we need
>   c) a portable way to annotate field accesses (this isn't so bad, we know
> how to annotate transforms, but the devil is in the details I suppose).
> - To support pushdown when building a pipeline with *any SDK* we need
>   d) some SDK-agnostic code for applying the pushdown
>
> I think (a,b,c) are solvable, but they depend on (d), which is a big
> unknown (to me). It looks like Option 2 pursues putting this pushdown code
> in runners, and having them operate on the Runner API protos directly. I
> have a few concerns with this:
> - It precludes having complex logic for (a,b) in the SDKs. This was
> touched on in your document already [1]. I don't think it's that
> problematic for projection, but as we look forward to predicate pushdown I
> think we'll need to allow complex logic in the SDK for discovering
> supported predicates (e.g. see the logic for predicate discovery in
> BigQuery [2]).
> - Will individual runners be responsible for implementing this? We could
> put the logic in runners-core-construction-java to re-use it across
> JVM-based runners, but what about others? Does this just move the code
> duplication from the SDK side to the runner side?
> - Finally, Option 1 is not really a stepping stone to this approach.
>
> An alternative portable approach might be to have an intermediate
> cross-SDK and cross-runner "optimizer", which (optionally) runs between the
> SDK and the runner. It could send expansion requests back to SDKs to
> "re-expand" sources with projections (and later predicates) applied. This
> would address all of the above:
> - Since we make expansion requests to apply projections, the SDKs can
> implement complex logic for (a,b).
> - The pushdown code isn't in a runner or an SDK, we'd only need one
> optimizer implementation.
> - Option 1 could be a stepping stone to this approach. The optimizer could
> be implemented with code from Option 1, if it's written with this in mind.
>
> This does have some downsides:
> - Sources are expanded twice. IO authors will need to avoid side effects
> in expansion methods, and any expensive logic will be executed twice.
> - It adds a Java dependency for non-Java SDKs (It is cleanly separable
> though: if you don't want pushdown you don't need Java).
>
> I'm curious what others think about this approach. Is it tenable? Am I
> missing something?
>
> Thanks!
> Brian
>
> [1]
> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit?disco=AAAAPFlTVTk
> [2]
> https://github.com/apache/beam/blob/f5afff17de0898bf0d2116e7757d89315f508cad/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryFilter.java#L117
>
> On Tue, Aug 17, 2021 at 10:44 AM Luke Cwik <lc...@google.com> wrote:
>
>> Thanks, I made a pass over Option 2
>>
>> On Mon, Aug 16, 2021 at 11:51 AM Kyle Weaver <kc...@google.com> wrote:
>>
>>> Hi again everyone, thanks for all of your feedback. I've almost
>>> completely rewritten the document to include another design option and
>>> address more possibilities and challenges. Please take a look.
>>>
>>> On Fri, Aug 6, 2021 at 12:05 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>
>>>> Very happy to see we will have pushdown optimizations for java
>>>> pipelines! Thanks for sharing the proposal.
>>>>
>>>> Thanks,
>>>> XInyu
>>>>
>>>> On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>> Thanks Kyle, very promising. I left some comments.
>>>>>
>>>>> —
>>>>> Alexey
>>>>>
>>>>> On 5 Aug 2021, at 19:59, Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>> Thanks, I took a look at it.
>>>>>
>>>>> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver <kc...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Beam devs,
>>>>>>
>>>>>> I'm back with another proposal involving Schema-based optimization in
>>>>>> the Beam Java SDK. This one builds on the ideas in my previous proposal and
>>>>>> is broader in scope. Please leave comments if this area is of interest to
>>>>>> you.
>>>>>>
>>>>>>
>>>>>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>>>>>>
>>>>>> Thank you,
>>>>>> Kyle
>>>>>>
>>>>>
>>>>>