You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <kl...@google.com> on 2018/06/13 23:47:28 UTC

Re: SQL Filter Pushdowns in Apache Beam SQL

This has come up in a couple of in-person conversations. Pushing filtering
and projection into to connectors is something we intend to do. Calcite's
optimizer is designed to support this, we just don't have it set up.

Your use case sounds like one that might test the limits of that, since the
JDBC read would occur before windowing or setting it up as a side input.
I'd be curious what a Beam pipeline to do this without SQL would look like.

Kenn

On Wed, Jun 13, 2018 at 8:47 AM Lukasz Cwik <lc...@google.com> wrote:

> It is currently the later where all the data is read and then filtered
> within the pipeline. Note that this doesn't mean that all the data is
> loaded into memory as the way that the join is done is dependent on the
> Runner that is powering the pipeline.
>
> Kenn had shared this doc[1] which is starting to look at integrating
> Runners and IO into the SQL shell and attempting to start defining a way to
> map properties from SQL onto the IO connector but it seems natural that the
> filter would get pushed down to the IO connector as well. Please take a
> look and feel free to comment.
>
> 1:
> https://docs.google.com/document/d/1ZFVlnldrIYhUgOfxIT2JcmTFFSWTl4HwAnQsnwiNL1g/edit#heading=h.4zubkdp87wok
>
> On Wed, Jun 13, 2018 at 7:39 AM Harshvardhan Agrawal <
> harshvardhan.agr93@gmail.com> wrote:
>
>> Hi,
>>
>> We are currently playing with Apache Beam’s SQL extension on top of
>> Flink. One of the features that we were interested is the SQL Predicate
>> Pushdown feature that Spark provides. Does Beam support that?
>>
>> For eg:
>> I have an unbounded dataset that I want to join with some static
>> reference data stored in a database. Will beam perform the logic of
>> figuring out all the unique keys in the window and push it down to the jdbc
>> source or will it bring all the data from the jdbc source into memory and
>> then perform the join?
>>
>> Thanks,
>> Harsh
>> --
>> Regards,
>> Harshvardhan
>>
>

Re: SQL Filter Pushdowns in Apache Beam SQL

Posted by Kenneth Knowles <kl...@google.com>.
I think I understand your use case better. Comments on those methods:

1) I think to make this work you would have to apply the filter before
converting it to a side input. So in that case pushdown is the question of
whether you use a Filter transform or do it in the JDBC query. Either way,
you will have to write the logic to figure out the keys you want. That
could be a moderately complex correlated subquery in SQL.

2) If you chose this route, you could use stateful ParDo(DoFn) to batch
requests to the external data source.

Have you also considered this?

1a) Use CoGroupByKey and/or the Join library, passing the unbounded and
bounded data sets.

Again, none of these rely on having the entire JDBC data set in memory.

Kenn



On Wed, Jun 13, 2018 at 5:54 PM Harshvardhan Agrawal <
harshvardhan.agr93@gmail.com> wrote:

> I would assume that in the case where we don’t go the SQL route we would
> have 2 options:
>
> 1) Store the reference data and supply it as side input. This solution
> would not be feasible in cases where I have to join against say 10
> different datasets since I don’t want to have so much of data in memory.
>
> 2) Perform lookups for each value of the field I am joining on. This could
> make my pipeline really chatty with the external source. It is possible
> that the external source might not be able to handle the volume of requests
> and network could end up being a bottleneck.
>
>
> On Wed, Jun 13, 2018 at 19:47 Kenneth Knowles <kl...@google.com> wrote:
>
>> This has come up in a couple of in-person conversations. Pushing
>> filtering and projection into to connectors is something we intend to do.
>> Calcite's optimizer is designed to support this, we just don't have it set
>> up.
>>
>> Your use case sounds like one that might test the limits of that, since
>> the JDBC read would occur before windowing or setting it up as a side
>> input. I'd be curious what a Beam pipeline to do this without SQL would
>> look like.
>>
>> Kenn
>>
>> On Wed, Jun 13, 2018 at 8:47 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> It is currently the later where all the data is read and then filtered
>>> within the pipeline. Note that this doesn't mean that all the data is
>>> loaded into memory as the way that the join is done is dependent on the
>>> Runner that is powering the pipeline.
>>>
>>> Kenn had shared this doc[1] which is starting to look at integrating
>>> Runners and IO into the SQL shell and attempting to start defining a way to
>>> map properties from SQL onto the IO connector but it seems natural that the
>>> filter would get pushed down to the IO connector as well. Please take a
>>> look and feel free to comment.
>>>
>>> 1:
>>> https://docs.google.com/document/d/1ZFVlnldrIYhUgOfxIT2JcmTFFSWTl4HwAnQsnwiNL1g/edit#heading=h.4zubkdp87wok
>>>
>>> On Wed, Jun 13, 2018 at 7:39 AM Harshvardhan Agrawal <
>>> harshvardhan.agr93@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We are currently playing with Apache Beam’s SQL extension on top of
>>>> Flink. One of the features that we were interested is the SQL Predicate
>>>> Pushdown feature that Spark provides. Does Beam support that?
>>>>
>>>> For eg:
>>>> I have an unbounded dataset that I want to join with some static
>>>> reference data stored in a database. Will beam perform the logic of
>>>> figuring out all the unique keys in the window and push it down to the jdbc
>>>> source or will it bring all the data from the jdbc source into memory and
>>>> then perform the join?
>>>>
>>>> Thanks,
>>>> Harsh
>>>> --
>>>> Regards,
>>>> Harshvardhan
>>>>
>>> --
> Regards,
> Harshvardhan
>

Re: SQL Filter Pushdowns in Apache Beam SQL

Posted by Kenneth Knowles <kl...@google.com>.
I think I understand your use case better. Comments on those methods:

1) I think to make this work you would have to apply the filter before
converting it to a side input. So in that case pushdown is the question of
whether you use a Filter transform or do it in the JDBC query. Either way,
you will have to write the logic to figure out the keys you want. That
could be a moderately complex correlated subquery in SQL.

2) If you chose this route, you could use stateful ParDo(DoFn) to batch
requests to the external data source.

Have you also considered this?

1a) Use CoGroupByKey and/or the Join library, passing the unbounded and
bounded data sets.

Again, none of these rely on having the entire JDBC data set in memory.

Kenn



On Wed, Jun 13, 2018 at 5:54 PM Harshvardhan Agrawal <
harshvardhan.agr93@gmail.com> wrote:

> I would assume that in the case where we don’t go the SQL route we would
> have 2 options:
>
> 1) Store the reference data and supply it as side input. This solution
> would not be feasible in cases where I have to join against say 10
> different datasets since I don’t want to have so much of data in memory.
>
> 2) Perform lookups for each value of the field I am joining on. This could
> make my pipeline really chatty with the external source. It is possible
> that the external source might not be able to handle the volume of requests
> and network could end up being a bottleneck.
>
>
> On Wed, Jun 13, 2018 at 19:47 Kenneth Knowles <kl...@google.com> wrote:
>
>> This has come up in a couple of in-person conversations. Pushing
>> filtering and projection into to connectors is something we intend to do.
>> Calcite's optimizer is designed to support this, we just don't have it set
>> up.
>>
>> Your use case sounds like one that might test the limits of that, since
>> the JDBC read would occur before windowing or setting it up as a side
>> input. I'd be curious what a Beam pipeline to do this without SQL would
>> look like.
>>
>> Kenn
>>
>> On Wed, Jun 13, 2018 at 8:47 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> It is currently the later where all the data is read and then filtered
>>> within the pipeline. Note that this doesn't mean that all the data is
>>> loaded into memory as the way that the join is done is dependent on the
>>> Runner that is powering the pipeline.
>>>
>>> Kenn had shared this doc[1] which is starting to look at integrating
>>> Runners and IO into the SQL shell and attempting to start defining a way to
>>> map properties from SQL onto the IO connector but it seems natural that the
>>> filter would get pushed down to the IO connector as well. Please take a
>>> look and feel free to comment.
>>>
>>> 1:
>>> https://docs.google.com/document/d/1ZFVlnldrIYhUgOfxIT2JcmTFFSWTl4HwAnQsnwiNL1g/edit#heading=h.4zubkdp87wok
>>>
>>> On Wed, Jun 13, 2018 at 7:39 AM Harshvardhan Agrawal <
>>> harshvardhan.agr93@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We are currently playing with Apache Beam’s SQL extension on top of
>>>> Flink. One of the features that we were interested is the SQL Predicate
>>>> Pushdown feature that Spark provides. Does Beam support that?
>>>>
>>>> For eg:
>>>> I have an unbounded dataset that I want to join with some static
>>>> reference data stored in a database. Will beam perform the logic of
>>>> figuring out all the unique keys in the window and push it down to the jdbc
>>>> source or will it bring all the data from the jdbc source into memory and
>>>> then perform the join?
>>>>
>>>> Thanks,
>>>> Harsh
>>>> --
>>>> Regards,
>>>> Harshvardhan
>>>>
>>> --
> Regards,
> Harshvardhan
>

Re: SQL Filter Pushdowns in Apache Beam SQL

Posted by Harshvardhan Agrawal <ha...@gmail.com>.
I would assume that in the case where we don’t go the SQL route we would
have 2 options:

1) Store the reference data and supply it as side input. This solution
would not be feasible in cases where I have to join against say 10
different datasets since I don’t want to have so much of data in memory.

2) Perform lookups for each value of the field I am joining on. This could
make my pipeline really chatty with the external source. It is possible
that the external source might not be able to handle the volume of requests
and network could end up being a bottleneck.


On Wed, Jun 13, 2018 at 19:47 Kenneth Knowles <kl...@google.com> wrote:

> This has come up in a couple of in-person conversations. Pushing filtering
> and projection into to connectors is something we intend to do. Calcite's
> optimizer is designed to support this, we just don't have it set up.
>
> Your use case sounds like one that might test the limits of that, since
> the JDBC read would occur before windowing or setting it up as a side
> input. I'd be curious what a Beam pipeline to do this without SQL would
> look like.
>
> Kenn
>
> On Wed, Jun 13, 2018 at 8:47 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> It is currently the later where all the data is read and then filtered
>> within the pipeline. Note that this doesn't mean that all the data is
>> loaded into memory as the way that the join is done is dependent on the
>> Runner that is powering the pipeline.
>>
>> Kenn had shared this doc[1] which is starting to look at integrating
>> Runners and IO into the SQL shell and attempting to start defining a way to
>> map properties from SQL onto the IO connector but it seems natural that the
>> filter would get pushed down to the IO connector as well. Please take a
>> look and feel free to comment.
>>
>> 1:
>> https://docs.google.com/document/d/1ZFVlnldrIYhUgOfxIT2JcmTFFSWTl4HwAnQsnwiNL1g/edit#heading=h.4zubkdp87wok
>>
>> On Wed, Jun 13, 2018 at 7:39 AM Harshvardhan Agrawal <
>> harshvardhan.agr93@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> We are currently playing with Apache Beam’s SQL extension on top of
>>> Flink. One of the features that we were interested is the SQL Predicate
>>> Pushdown feature that Spark provides. Does Beam support that?
>>>
>>> For eg:
>>> I have an unbounded dataset that I want to join with some static
>>> reference data stored in a database. Will beam perform the logic of
>>> figuring out all the unique keys in the window and push it down to the jdbc
>>> source or will it bring all the data from the jdbc source into memory and
>>> then perform the join?
>>>
>>> Thanks,
>>> Harsh
>>> --
>>> Regards,
>>> Harshvardhan
>>>
>> --
Regards,
Harshvardhan