You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> on 2020/03/25 03:12:09 UTC
Where can i find MySQL retract stream table sink java soure code?
Create one table with kafka, another table with MySQL using flinksql.
Write a sql to read from kafka and write to MySQL.
INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM
(SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink?
Thanks,Lei
wanglei2@geekplus.com.cn
Re: Re: Where can i find MySQL retract stream table sink java soure code?
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Seems it is here: https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
There's no JDBCRetractTableSink, only append and upsert.
I am confused why the MySQL record can be deleted.
Thanks,
Lei
wanglei2@geekplus.com.cn
Sender: wanglei2@geekplus.com.cn
Send Time: 2020-03-25 11:25
Receiver: Jingsong Li
cc: user
Subject: Re: Re: Where can i find MySQL retract stream table sink java soure code?
Thanks Jingsong.
When executing this sql, the mysql table record can be deleted. So i guess it is a retract stream.
I want to know the exactly java code it is generated and have a look at it.
Thanks,
Lei
wanglei2@geekplus.com.cn
Sender: Jingsong Li
Send Time: 2020-03-25 11:14
Receiver: wanglei2@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,
This can be a upsert stream [1]
Best,
Jingsong Lee
On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
Create one table with kafka, another table with MySQL using flinksql.
Write a sql to read from kafka and write to MySQL.
INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM
(SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink?
Thanks,Lei
wanglei2@geekplus.com.cn
--
Best, Jingsong Lee
Re: Re: Where can i find MySQL retract stream table sink java soure code?
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks Jingsong.
When executing this sql, the mysql table record can be deleted. So i guess it is a retract stream.
I want to know the exactly java code it is generated and have a look at it.
Thanks,
Lei
wanglei2@geekplus.com.cn
Sender: Jingsong Li
Send Time: 2020-03-25 11:14
Receiver: wanglei2@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,
This can be a upsert stream [1]
Best,
Jingsong Lee
On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
Create one table with kafka, another table with MySQL using flinksql.
Write a sql to read from kafka and write to MySQL.
INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM
(SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink?
Thanks,Lei
wanglei2@geekplus.com.cn
--
Best, Jingsong Lee
Re: Re: Where can i find MySQL retract stream table sink java soure code?
Posted by Jingsong Li <ji...@gmail.com>.
It is not the mean what you said.
There are two queries: append query and update query.
For update query, there are two ways to handle, one is retract, another is
upsert.
So the thing is a sink can choose a mode to handle update query. Just
choose one is OK.
You could read more in [1].
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html
Best,
Jingsong Lee
On Wed, Mar 25, 2020 at 11:57 AM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
> Thanks Jingsong.
>
> So JDBCTableSink now suport append and upsert mode. Retract mode not
> available yet. It is right?
>
> Thanks,
> Lei
>
>
> ------------------------------
> wanglei2@geekplus.com.cn
>
>
> *Sender:* Jingsong Li <ji...@gmail.com>
> *Send Time:* 2020-03-25 11:39
> *Receiver:* wanglei2@geekplus.com.cn
> *cc:* user <us...@flink.apache.org>
> *Subject:* Re: Where can i find MySQL retract stream table sink java
> soure code?
> Hi,
>
> Maybe you have some misunderstanding to upsert sink. You can take a look
> to [1], it can deal with "delete" records.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li <ji...@gmail.com>
> wrote:
>
>> Hi,
>>
>> This can be a upsert stream [1], and JDBC has upsert sink now [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li <ji...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> This can be a upsert stream [1]
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <
>>> wanglei2@geekplus.com.cn> wrote:
>>>
>>>>
>>>> Create one table with kafka, another table with MySQL using flinksql.
>>>> Write a sql to read from kafka and write to MySQL.
>>>>
>>>> INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM
>>>> (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
>>>> GROUP BY status
>>>>
>>>> I think this is a retract stream.
>>>>
>>>> But where can i find the java source code about MySQL retract table sink?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Lei
>>>>
>>>>
>>>> ------------------------------
>>>> wanglei2@geekplus.com.cn
>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>
>
--
Best, Jingsong Lee
Re: Re: Where can i find MySQL retract stream table sink java soure code?
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks Jingsong.
So JDBCTableSink now suport append and upsert mode. Retract mode not available yet. It is right?
Thanks,
Lei
wanglei2@geekplus.com.cn
Sender: Jingsong Li
Send Time: 2020-03-25 11:39
Receiver: wanglei2@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,
Maybe you have some misunderstanding to upsert sink. You can take a look to [1], it can deal with "delete" records.
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
Best,
Jingsong Lee
On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li <ji...@gmail.com> wrote:
Hi,
This can be a upsert stream [1], and JDBC has upsert sink now [2].
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
[2]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
Best,
Jingsong Lee
On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li <ji...@gmail.com> wrote:
Hi,
This can be a upsert stream [1]
Best,
Jingsong Lee
On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
Create one table with kafka, another table with MySQL using flinksql.
Write a sql to read from kafka and write to MySQL.
INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM
(SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
GROUP BY statusI think this is a retract stream. But where can i find the java source code about MySQL retract table sink?
Thanks,Lei
wanglei2@geekplus.com.cn
--
Best, Jingsong Lee
--
Best, Jingsong Lee
--
Best, Jingsong Lee
Re: Where can i find MySQL retract stream table sink java soure code?
Posted by Jingsong Li <ji...@gmail.com>.
Hi,
Maybe you have some misunderstanding to upsert sink. You can take a look to
[1], it can deal with "delete" records.
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
Best,
Jingsong Lee
On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li <ji...@gmail.com> wrote:
> Hi,
>
> This can be a upsert stream [1], and JDBC has upsert sink now [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li <ji...@gmail.com>
> wrote:
>
>> Hi,
>>
>> This can be a upsert stream [1]
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <
>> wanglei2@geekplus.com.cn> wrote:
>>
>>>
>>> Create one table with kafka, another table with MySQL using flinksql.
>>> Write a sql to read from kafka and write to MySQL.
>>>
>>> INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM
>>> (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
>>> GROUP BY status
>>>
>>> I think this is a retract stream.
>>>
>>> But where can i find the java source code about MySQL retract table sink?
>>>
>>>
>>> Thanks,
>>>
>>> Lei
>>>
>>>
>>> ------------------------------
>>> wanglei2@geekplus.com.cn
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>
--
Best, Jingsong Lee
Re: Where can i find MySQL retract stream table sink java soure code?
Posted by Jingsong Li <ji...@gmail.com>.
Hi,
This can be a upsert stream [1], and JDBC has upsert sink now [2].
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
Best,
Jingsong Lee
On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li <ji...@gmail.com> wrote:
> Hi,
>
> This can be a upsert stream [1]
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <
> wanglei2@geekplus.com.cn> wrote:
>
>>
>> Create one table with kafka, another table with MySQL using flinksql.
>> Write a sql to read from kafka and write to MySQL.
>>
>> INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM
>> (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
>> GROUP BY status
>>
>> I think this is a retract stream.
>>
>> But where can i find the java source code about MySQL retract table sink?
>>
>>
>> Thanks,
>>
>> Lei
>>
>>
>> ------------------------------
>> wanglei2@geekplus.com.cn
>>
>>
>
> --
> Best, Jingsong Lee
>
--
Best, Jingsong Lee
Re: Where can i find MySQL retract stream table sink java soure code?
Posted by Jingsong Li <ji...@gmail.com>.
Hi,
This can be a upsert stream [1]
Best,
Jingsong Lee
On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
>
> Create one table with kafka, another table with MySQL using flinksql.
> Write a sql to read from kafka and write to MySQL.
>
> INSERT INTO mysqlTable SELECT status, COUNT(order_no) AS num FROM
> (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
> GROUP BY status
>
> I think this is a retract stream.
>
> But where can i find the java source code about MySQL retract table sink?
>
>
> Thanks,
>
> Lei
>
>
> ------------------------------
> wanglei2@geekplus.com.cn
>
>
--
Best, Jingsong Lee