You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Yushu Yao <ya...@gmail.com> on 2022/01/10 20:15:09 UTC

Calcite/BeamSql

Hi Folks,

Question from a Newbie for both Calcite and Beam:

I understand Calcite can make a tree of execution plan with relational
algebra and push certain operations to a "data source". And at the same
time, it can allow source-specific optimizations.

I also understand that Beam SQL can run SqlTransform.query() on one or more
of the PCollection<Row>, and Calcite is used in coming up with the
execution plan.

My question is, assume I have a MySql Table as Table1, and a Kafka Stream
called "Kafka".

Now I want to do some joins like lookuping up a row based on a key in the
Kafka message:
select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key

What's the best way to implement this with beamSQL. (Note that we can't
hardcode the join because each input Kafka message may need a different
SQL).

One step further, if we have 2 MySql Tables, Table1, and Table2. And a
Kafka Stream "Kafka". And we want to join those 2 tables inside MySql first
(and maybe with aggregations like sum/count), then join with the Kafka. Is
there a way to tap into calcite so that the join of the 2 tables are
actually pushed into MySql?

Sorry for the lengthy question and please let me know if more
clarifications is needed.

Thanks a lot in advanced!

-Yushu

Re: Calcite/BeamSql

Posted by Yushu Yao <ya...@gmail.com>.
Ok, get it. JdbcIO.readRows() is what I'm looking for.
Thanks!


On Tue, Jan 11, 2022 at 2:47 PM Alexey Romanenko <ar...@gmail.com>
wrote:

> If I understand your problem right, you can just use JdbcIO.readRows(),
> which returns a PCollection<Row> and can be used downstream to create
> a PCollectionTuple, which, in its turn, already contains another
> PCollection<Row> from your Kafka source. So, once you have
> a PCollectionTuple with two TupleTags (from Kafka and MySql), you can apply
> SqlTransform over it.
>
> —
> Alexey
>
>
>
>
> On 11 Jan 2022, at 03:54, Yushu Yao <ya...@gmail.com> wrote:
>
> Thanks, Brian for the explanation. That helps a lot.
> Now I'm clear on the Kafka source side.
>
> A follow-up on the other source that's in MySql. If I want to do the query:
> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key
>
> I can get the Kafka stream into a PCollection as you said above.
> How about the MySql Table 1? Is there some semantic in Beam that allows me
> to make the MySql table into a PCollection? (Or do I need to import it as a
> PCollection? I think there is a Beam SQL Extension for it?) And does it
> need to scan the full MySql Table1 to accomplish the above join?
>
> Thanks again!
> -Yushu
>
>
> On Mon, Jan 10, 2022 at 1:50 PM Brian Hulette <bh...@google.com> wrote:
>
>> Hi Yushu,
>> Thanks for the questions! To process Kafka data with SqlTransform you
>> have a couple of options, you could just use KafkaIO and manually
>> transforms the records to produce a PCollection with a Schema [1], or you
>> could use the DDL to describe your kafka stream as a table [2], and query
>> it directly with SqlTransform. You can find examples of using the DDL with
>> SqlTransform here [3]. Note that the Kafka DDL supports "Generic Payload
>> Handling", so you should be able to configure it to consume JSON, proto,
>> thrift, or avro messages [4]. Would one of those work for you?
>>
>> For your second question about "pushing down" the join on 2 tables:
>> unfortunately, that's not something we support right now. You'd have to do
>> that sort of optimization manually. This is something we've discussed in
>> the abstract but it's a ways off.
>>
>> Brian
>>
>> [1]
>> https://beam.apache.org/documentation/programming-guide/#what-is-a-schema
>> [2]
>> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka
>> [3]
>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html
>> [4]
>> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#generic-payload-handling
>>
>> On Mon, Jan 10, 2022 at 12:15 PM Yushu Yao <ya...@gmail.com> wrote:
>>
>>> Hi Folks,
>>>
>>> Question from a Newbie for both Calcite and Beam:
>>>
>>> I understand Calcite can make a tree of execution plan with relational
>>> algebra and push certain operations to a "data source". And at the same
>>> time, it can allow source-specific optimizations.
>>>
>>> I also understand that Beam SQL can run SqlTransform.query() on one or
>>> more of the PCollection<Row>, and Calcite is used in coming up with the
>>> execution plan.
>>>
>>> My question is, assume I have a MySql Table as Table1, and a Kafka
>>> Stream called "Kafka".
>>>
>>> Now I want to do some joins like lookuping up a row based on a key in
>>> the Kafka message:
>>> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key
>>>
>>> What's the best way to implement this with beamSQL. (Note that we can't
>>> hardcode the join because each input Kafka message may need a different
>>> SQL).
>>>
>>> One step further, if we have 2 MySql Tables, Table1, and Table2. And a
>>> Kafka Stream "Kafka". And we want to join those 2 tables inside MySql first
>>> (and maybe with aggregations like sum/count), then join with the Kafka. Is
>>> there a way to tap into calcite so that the join of the 2 tables are
>>> actually pushed into MySql?
>>>
>>> Sorry for the lengthy question and please let me know if more
>>> clarifications is needed.
>>>
>>> Thanks a lot in advanced!
>>>
>>> -Yushu
>>>
>>>
>>>
>>>
>

Re: Calcite/BeamSql

Posted by Alexey Romanenko <ar...@gmail.com>.
If I understand your problem right, you can just use JdbcIO.readRows(), which returns a PCollection<Row> and can be used downstream to create a PCollectionTuple, which, in its turn, already contains another PCollection<Row> from your Kafka source. So, once you have a PCollectionTuple with two TupleTags (from Kafka and MySql), you can apply SqlTransform over it.

—
Alexey




> On 11 Jan 2022, at 03:54, Yushu Yao <ya...@gmail.com> wrote:
> 
> Thanks, Brian for the explanation. That helps a lot. 
> Now I'm clear on the Kafka source side. 
> 
> A follow-up on the other source that's in MySql. If I want to do the query:
> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key 
> 
> I can get the Kafka stream into a PCollection as you said above. 
> How about the MySql Table 1? Is there some semantic in Beam that allows me to make the MySql table into a PCollection? (Or do I need to import it as a PCollection? I think there is a Beam SQL Extension for it?) And does it need to scan the full MySql Table1 to accomplish the above join? 
> 
> Thanks again!
> -Yushu
> 
> 
> On Mon, Jan 10, 2022 at 1:50 PM Brian Hulette <bhulette@google.com <ma...@google.com>> wrote:
> Hi Yushu,
> Thanks for the questions! To process Kafka data with SqlTransform you have a couple of options, you could just use KafkaIO and manually transforms the records to produce a PCollection with a Schema [1], or you could use the DDL to describe your kafka stream as a table [2], and query it directly with SqlTransform. You can find examples of using the DDL with SqlTransform here [3]. Note that the Kafka DDL supports "Generic Payload Handling", so you should be able to configure it to consume JSON, proto, thrift, or avro messages [4]. Would one of those work for you?
> 
> For your second question about "pushing down" the join on 2 tables: unfortunately, that's not something we support right now. You'd have to do that sort of optimization manually. This is something we've discussed in the abstract but it's a ways off.
> 
> Brian
> 
> [1] https://beam.apache.org/documentation/programming-guide/#what-is-a-schema <https://beam.apache.org/documentation/programming-guide/#what-is-a-schema>
> [2] https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka <https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka>
> [3] https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html <https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html>
> [4] https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#generic-payload-handling <https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#generic-payload-handling>
> On Mon, Jan 10, 2022 at 12:15 PM Yushu Yao <yao.yushu@gmail.com <ma...@gmail.com>> wrote:
> Hi Folks, 
> 
> Question from a Newbie for both Calcite and Beam:
> 
> I understand Calcite can make a tree of execution plan with relational algebra and push certain operations to a "data source". And at the same time, it can allow source-specific optimizations. 
> 
> I also understand that Beam SQL can run SqlTransform.query() on one or more of the PCollection<Row>, and Calcite is used in coming up with the execution plan. 
> 
> My question is, assume I have a MySql Table as Table1, and a Kafka Stream called "Kafka". 
> 
> Now I want to do some joins like lookuping up a row based on a key in the Kafka message: 
> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key 
> 
> What's the best way to implement this with beamSQL. (Note that we can't hardcode the join because each input Kafka message may need a different SQL). 
> 
> One step further, if we have 2 MySql Tables, Table1, and Table2. And a Kafka Stream "Kafka". And we want to join those 2 tables inside MySql first (and maybe with aggregations like sum/count), then join with the Kafka. Is there a way to tap into calcite so that the join of the 2 tables are actually pushed into MySql? 
> 
> Sorry for the lengthy question and please let me know if more clarifications is needed. 
> 
> Thanks a lot in advanced!
> 
> -Yushu
> 
> 
> 


Re: Calcite/BeamSql

Posted by Yushu Yao <ya...@gmail.com>.
Thanks, Brian for the explanation. That helps a lot.
Now I'm clear on the Kafka source side.

A follow-up on the other source that's in MySql. If I want to do the query:
select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key

I can get the Kafka stream into a PCollection as you said above.
How about the MySql Table 1? Is there some semantic in Beam that allows me
to make the MySql table into a PCollection? (Or do I need to import it as a
PCollection? I think there is a Beam SQL Extension for it?) And does it
need to scan the full MySql Table1 to accomplish the above join?

Thanks again!
-Yushu


On Mon, Jan 10, 2022 at 1:50 PM Brian Hulette <bh...@google.com> wrote:

> Hi Yushu,
> Thanks for the questions! To process Kafka data with SqlTransform you have
> a couple of options, you could just use KafkaIO and manually transforms the
> records to produce a PCollection with a Schema [1], or you could use the
> DDL to describe your kafka stream as a table [2], and query it directly
> with SqlTransform. You can find examples of using the DDL with SqlTransform
> here [3]. Note that the Kafka DDL supports "Generic Payload Handling", so
> you should be able to configure it to consume JSON, proto, thrift, or avro
> messages [4]. Would one of those work for you?
>
> For your second question about "pushing down" the join on 2 tables:
> unfortunately, that's not something we support right now. You'd have to do
> that sort of optimization manually. This is something we've discussed in
> the abstract but it's a ways off.
>
> Brian
>
> [1]
> https://beam.apache.org/documentation/programming-guide/#what-is-a-schema
> [2]
> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka
> [3]
> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html
> [4]
> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#generic-payload-handling
>
> On Mon, Jan 10, 2022 at 12:15 PM Yushu Yao <ya...@gmail.com> wrote:
>
>> Hi Folks,
>>
>> Question from a Newbie for both Calcite and Beam:
>>
>> I understand Calcite can make a tree of execution plan with relational
>> algebra and push certain operations to a "data source". And at the same
>> time, it can allow source-specific optimizations.
>>
>> I also understand that Beam SQL can run SqlTransform.query() on one or
>> more of the PCollection<Row>, and Calcite is used in coming up with the
>> execution plan.
>>
>> My question is, assume I have a MySql Table as Table1, and a Kafka Stream
>> called "Kafka".
>>
>> Now I want to do some joins like lookuping up a row based on a key in the
>> Kafka message:
>> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key
>>
>> What's the best way to implement this with beamSQL. (Note that we can't
>> hardcode the join because each input Kafka message may need a different
>> SQL).
>>
>> One step further, if we have 2 MySql Tables, Table1, and Table2. And a
>> Kafka Stream "Kafka". And we want to join those 2 tables inside MySql first
>> (and maybe with aggregations like sum/count), then join with the Kafka. Is
>> there a way to tap into calcite so that the join of the 2 tables are
>> actually pushed into MySql?
>>
>> Sorry for the lengthy question and please let me know if more
>> clarifications is needed.
>>
>> Thanks a lot in advanced!
>>
>> -Yushu
>>
>>
>>
>>

Re: Calcite/BeamSql

Posted by Brian Hulette <bh...@google.com>.
Hi Yushu,
Thanks for the questions! To process Kafka data with SqlTransform you have
a couple of options, you could just use KafkaIO and manually transforms the
records to produce a PCollection with a Schema [1], or you could use the
DDL to describe your kafka stream as a table [2], and query it directly
with SqlTransform. You can find examples of using the DDL with SqlTransform
here [3]. Note that the Kafka DDL supports "Generic Payload Handling", so
you should be able to configure it to consume JSON, proto, thrift, or avro
messages [4]. Would one of those work for you?

For your second question about "pushing down" the join on 2 tables:
unfortunately, that's not something we support right now. You'd have to do
that sort of optimization manually. This is something we've discussed in
the abstract but it's a ways off.

Brian

[1]
https://beam.apache.org/documentation/programming-guide/#what-is-a-schema
[2]
https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka
[3]
https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html
[4]
https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#generic-payload-handling

On Mon, Jan 10, 2022 at 12:15 PM Yushu Yao <ya...@gmail.com> wrote:

> Hi Folks,
>
> Question from a Newbie for both Calcite and Beam:
>
> I understand Calcite can make a tree of execution plan with relational
> algebra and push certain operations to a "data source". And at the same
> time, it can allow source-specific optimizations.
>
> I also understand that Beam SQL can run SqlTransform.query() on one or
> more of the PCollection<Row>, and Calcite is used in coming up with the
> execution plan.
>
> My question is, assume I have a MySql Table as Table1, and a Kafka Stream
> called "Kafka".
>
> Now I want to do some joins like lookuping up a row based on a key in the
> Kafka message:
> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key
>
> What's the best way to implement this with beamSQL. (Note that we can't
> hardcode the join because each input Kafka message may need a different
> SQL).
>
> One step further, if we have 2 MySql Tables, Table1, and Table2. And a
> Kafka Stream "Kafka". And we want to join those 2 tables inside MySql first
> (and maybe with aggregations like sum/count), then join with the Kafka. Is
> there a way to tap into calcite so that the join of the 2 tables are
> actually pushed into MySql?
>
> Sorry for the lengthy question and please let me know if more
> clarifications is needed.
>
> Thanks a lot in advanced!
>
> -Yushu
>
>
>
>