You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> on 2020/03/25 03:12:09 UTC

Where can i find MySQL retract stream table sink java soure code?

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
                        (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) 
                        GROUP BY statusI think this is a retract stream. But where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wanglei2@geekplus.com.cn 


Re: Re: Where can i find MySQL retract stream table sink java soure code?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Seems it is here:  https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
There's no JDBCRetractTableSink, only  append and upsert.
I am confused why the MySQL record can be deleted. 

Thanks,
Lei 


wanglei2@geekplus.com.cn 

 
Sender: wanglei2@geekplus.com.cn
Send Time: 2020-03-25 11:25
Receiver: Jingsong Li
cc: user
Subject: Re: Re: Where can i find MySQL retract stream table sink java soure code?

Thanks Jingsong.

When executing this sql, the mysql table record can be deleted. So i guess it is a retract stream.
I want to know the exactly java code it is generated and have a look at it.  

Thanks,
Lei




wanglei2@geekplus.com.cn
 
Sender: Jingsong Li
Send Time: 2020-03-25 11:14
Receiver: wanglei2@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
                        (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) 
                        GROUP BY statusI think this is a retract stream. But where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wanglei2@geekplus.com.cn 



-- 
Best, Jingsong Lee

Re: Re: Where can i find MySQL retract stream table sink java soure code?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks Jingsong.

When executing this sql, the mysql table record can be deleted. So i guess it is a retract stream.
I want to know the exactly java code it is generated and have a look at it.  

Thanks,
Lei




wanglei2@geekplus.com.cn
 
Sender: Jingsong Li
Send Time: 2020-03-25 11:14
Receiver: wanglei2@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
                        (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) 
                        GROUP BY statusI think this is a retract stream. But where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wanglei2@geekplus.com.cn 



-- 
Best, Jingsong Lee

Re: Re: Where can i find MySQL retract stream table sink java soure code?

Posted by Jingsong Li <ji...@gmail.com>.
It is not the mean what you said.

There are two queries: append query and update query.

For update query, there are two ways to handle, one is retract, another is
upsert.
So the thing is a sink can choose a mode to handle update query. Just
choose one is OK.

You could read more in [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:57 AM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

> Thanks Jingsong.
>
> So JDBCTableSink now suport append and upsert mode.  Retract mode not
>  available yet. It is right?
>
> Thanks,
> Lei
>
>
> ------------------------------
> wanglei2@geekplus.com.cn
>
>
> *Sender:* Jingsong Li <ji...@gmail.com>
> *Send Time:* 2020-03-25 11:39
> *Receiver:* wanglei2@geekplus.com.cn
> *cc:* user <us...@flink.apache.org>
> *Subject:* Re: Where can i find MySQL retract stream table sink java
> soure code?
> Hi,
>
> Maybe you have some misunderstanding to upsert sink. You can take a look
> to [1], it can deal with "delete" records.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li <ji...@gmail.com>
> wrote:
>
>> Hi,
>>
>> This can be a upsert stream [1], and JDBC has upsert sink now [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li <ji...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> This can be a upsert stream [1]
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <
>>> wanglei2@geekplus.com.cn> wrote:
>>>
>>>>
>>>> Create one table with kafka,  another table with MySQL  using flinksql.
>>>> Write a sql to read from kafka and write to MySQL.
>>>>
>>>> INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM
>>>>                         (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
>>>>                         GROUP BY status
>>>>
>>>> I think this is a retract stream.
>>>>
>>>> But where can i find the java source code  about MySQL retract table sink?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Lei
>>>>
>>>>
>>>> ------------------------------
>>>> wanglei2@geekplus.com.cn
>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee

Re: Re: Where can i find MySQL retract stream table sink java soure code?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks Jingsong.

So JDBCTableSink now suport append and upsert mode.  Retract mode not  available yet. It is right?

Thanks,
Lei 




wanglei2@geekplus.com.cn 

 
Sender: Jingsong Li
Send Time: 2020-03-25 11:39
Receiver: wanglei2@geekplus.com.cn
cc: user
Subject: Re: Where can i find MySQL retract stream table sink java soure code?
Hi,

Maybe you have some misunderstanding to upsert sink. You can take a look to [1], it can deal with "delete" records.

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li <ji...@gmail.com> wrote:
Hi,

This can be a upsert stream [1], and JDBC has upsert sink now [2].

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
[2]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li <ji...@gmail.com> wrote:
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:

Create one table with kafka,  another table with MySQL  using flinksql.
Write a sql to read from kafka and write to MySQL.

INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM 
                        (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no) 
                        GROUP BY statusI think this is a retract stream. But where can i find the java source code  about MySQL retract table sink?
Thanks,Lei



wanglei2@geekplus.com.cn 



-- 
Best, Jingsong Lee


-- 
Best, Jingsong Lee


-- 
Best, Jingsong Lee

Re: Where can i find MySQL retract stream table sink java soure code?

Posted by Jingsong Li <ji...@gmail.com>.
Hi,

Maybe you have some misunderstanding to upsert sink. You can take a look to
[1], it can deal with "delete" records.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li <ji...@gmail.com> wrote:

> Hi,
>
> This can be a upsert stream [1], and JDBC has upsert sink now [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li <ji...@gmail.com>
> wrote:
>
>> Hi,
>>
>> This can be a upsert stream [1]
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <
>> wanglei2@geekplus.com.cn> wrote:
>>
>>>
>>> Create one table with kafka,  another table with MySQL  using flinksql.
>>> Write a sql to read from kafka and write to MySQL.
>>>
>>> INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM
>>>                         (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
>>>                         GROUP BY status
>>>
>>> I think this is a retract stream.
>>>
>>> But where can i find the java source code  about MySQL retract table sink?
>>>
>>>
>>> Thanks,
>>>
>>> Lei
>>>
>>>
>>> ------------------------------
>>> wanglei2@geekplus.com.cn
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee

Re: Where can i find MySQL retract stream table sink java soure code?

Posted by Jingsong Li <ji...@gmail.com>.
Hi,

This can be a upsert stream [1], and JDBC has upsert sink now [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li <ji...@gmail.com> wrote:

> Hi,
>
> This can be a upsert stream [1]
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <
> wanglei2@geekplus.com.cn> wrote:
>
>>
>> Create one table with kafka,  another table with MySQL  using flinksql.
>> Write a sql to read from kafka and write to MySQL.
>>
>> INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM
>>                         (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
>>                         GROUP BY status
>>
>> I think this is a retract stream.
>>
>> But where can i find the java source code  about MySQL retract table sink?
>>
>>
>> Thanks,
>>
>> Lei
>>
>>
>> ------------------------------
>> wanglei2@geekplus.com.cn
>>
>>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee

Re: Where can i find MySQL retract stream table sink java soure code?

Posted by Jingsong Li <ji...@gmail.com>.
Hi,

This can be a upsert stream [1]

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:12 AM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

>
> Create one table with kafka,  another table with MySQL  using flinksql.
> Write a sql to read from kafka and write to MySQL.
>
> INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM
>                         (SELECT order_no, LAST_VALUE(status) AS status FROM kafkaTable GROUP BY order_no)
>                         GROUP BY status
>
> I think this is a retract stream.
>
> But where can i find the java source code  about MySQL retract table sink?
>
>
> Thanks,
>
> Lei
>
>
> ------------------------------
> wanglei2@geekplus.com.cn
>
>

-- 
Best, Jingsong Lee