You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by po...@gmx.com on 2022/09/15 14:10:15 UTC

INSERT INTO will work faster in Flink than in regular database?

What's the most effective way (performance) to update big no of rows?
Sure this will be probably "INSERT INTO table (column1) SELECT column1 FROM ...". Anyway, I do not see any "UPDATE" in Flink?
But sometimes SQL is not enough.
Suppose I have code:
 
TableResult tableResult1 = tEnv.executeSql("SELECT * FROM SomeTable");
try (org.apache.flink.util.CloseableIterator<Row> it = tableResult1.collect()) {
	while(it.hasNext()) {
		Row row = it.next();
		//Treat row:
		String x_field = row.getField("some_column").toString();
		//Do something with x_field
                ...
                tEnv.executeSql("INSERT INTO AnotherTable (column) VALUES ('new_value')");
	}
}

But this INSERT will be probably performance killer...

Any suggestion how to do it in a smart way?

Mike

Re: Re: INSERT INTO will work faster in Flink than in regular database?

Posted by Martijn Visser <ma...@apache.org>.
I see. As mentioned, Flink uses Dynamic Tables, which are a logical
concept. They don't necessarily (fully) materialize during query execution.

On Tue, Sep 27, 2022 at 1:35 PM <po...@gmx.com> wrote:

>
> I was thinking if I can use Flink to process large files and save result
> to another file or database (jdbc).
> So loading file into Flink table and then loop through rows inserting
> result to some temp Flink table.
> Then save result to jdbc table or file.
>
>
>
> Sent: Thursday, September 22, 2022 at 5:06 PM
> From: "Martijn Visser" <ma...@apache.org>
> To: "Xuyang" <xy...@163.com>
> Cc: podunk@gmx.com, user@flink.apache.org
> Subject: Re: Re: INSERT INTO will work faster in Flink than in regular
> database?
>
> Hi,
>
> What is your use case? Flink doesn't have an UPDATE statement, because
> Flink relies on Dynamic Tables and Continuous Queries. [1]
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/concepts/dynamic_tables/
>
>
> On Wed, Sep 21, 2022 at 5:40 PM Xuyang <xyzhong131@163.com[mailto:
> xyzhong131@163.com]> wrote:
> Hi, You're right, there's no keyword 'update' in flink.
>
>
>
>
>
> --
>
>     Best!
>     Xuyang
>
>
>
> 在 2022-09-21 22:40:03,podunk@gmx.com[mailto:podunk@gmx.com] 写道:
>
> Thank you - I'll try.
> There is no 'UPDATE' clause in Flink SQL?
>
>
>
> Sent: Monday, September 19, 2022 at 4:09 AM
> From: "Shengkai Fang" <fskmine@gmail.com[mailto:fskmine@gmail.com]>
> To: podunk@gmx.com[mailto:podunk@gmx.com]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org]
> Subject: Re: INSERT INTO will work faster in Flink than in regular
> database?
>
> Hi. I think you can write a udf[1] to process some fields and then insert
> into the sink.
>
> Best.
> Shengkai
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/]
>
>
> <podunk@gmx.com[mailto:podunk@gmx.com]> 于2022年9月15日周四 22:10写道:What's the
> most effective way (performance) to update big no of rows?
> Sure this will be probably "INSERT INTO table (column1) SELECT column1
> FROM ...". Anyway, I do not see any "UPDATE" in Flink?
> But sometimes SQL is not enough.
> Suppose I have code:
>
> TableResult tableResult1 = tEnv.executeSql("SELECT * FROM SomeTable");
> try (org.apache.flink.util.CloseableIterator<Row> it =
> tableResult1.collect()) {
>         while(it.hasNext()) {
>                 Row row = it.next();
>                 //Treat row:
>                 String x_field = row.getField("some_column").toString();
>                 //Do something with x_field
>                 ...
>                 tEnv.executeSql("INSERT INTO AnotherTable (column) VALUES
> ('new_value')");
>         }
> }
>
> But this INSERT will be probably performance killer...
>
> Any suggestion how to do it in a smart way?
>
> Mike
>

Re: Re: INSERT INTO will work faster in Flink than in regular database?

Posted by po...@gmx.com.
I was thinking if I can use Flink to process large files and save result to another file or database (jdbc).
So loading file into Flink table and then loop through rows inserting result to some temp Flink table.
Then save result to jdbc table or file.

 

Sent: Thursday, September 22, 2022 at 5:06 PM
From: "Martijn Visser" <ma...@apache.org>
To: "Xuyang" <xy...@163.com>
Cc: podunk@gmx.com, user@flink.apache.org
Subject: Re: Re: INSERT INTO will work faster in Flink than in regular database?

Hi,
 
What is your use case? Flink doesn't have an UPDATE statement, because Flink relies on Dynamic Tables and Continuous Queries. [1]
 
Best regards,
 
Martijn
 
[1] https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/concepts/dynamic_tables/ 

On Wed, Sep 21, 2022 at 5:40 PM Xuyang <xyzhong131@163.com[mailto:xyzhong131@163.com]> wrote:
Hi, You're right, there's no keyword 'update' in flink.
 
 
 
 

--

    Best!
    Xuyang
 
 
 
在 2022-09-21 22:40:03,podunk@gmx.com[mailto:podunk@gmx.com] 写道:

Thank you - I'll try.
There is no 'UPDATE' clause in Flink SQL?
 
 

Sent: Monday, September 19, 2022 at 4:09 AM
From: "Shengkai Fang" <fskmine@gmail.com[mailto:fskmine@gmail.com]>
To: podunk@gmx.com[mailto:podunk@gmx.com]
Cc: user@flink.apache.org[mailto:user@flink.apache.org]
Subject: Re: INSERT INTO will work faster in Flink than in regular database?

Hi. I think you can write a udf[1] to process some fields and then insert into the sink. 
 
Best.
Shengkai
 
[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/] 

<podunk@gmx.com[mailto:podunk@gmx.com]> 于2022年9月15日周四 22:10写道:What's the most effective way (performance) to update big no of rows?
Sure this will be probably "INSERT INTO table (column1) SELECT column1 FROM ...". Anyway, I do not see any "UPDATE" in Flink?
But sometimes SQL is not enough.
Suppose I have code:
 
TableResult tableResult1 = tEnv.executeSql("SELECT * FROM SomeTable");
try (org.apache.flink.util.CloseableIterator<Row> it = tableResult1.collect()) {
        while(it.hasNext()) {
                Row row = it.next();
                //Treat row:
                String x_field = row.getField("some_column").toString();
                //Do something with x_field
                ...
                tEnv.executeSql("INSERT INTO AnotherTable (column) VALUES ('new_value')");
        }
}

But this INSERT will be probably performance killer...

Any suggestion how to do it in a smart way?

Mike

Re: Re: INSERT INTO will work faster in Flink than in regular database?

Posted by Martijn Visser <ma...@apache.org>.
Hi,

What is your use case? Flink doesn't have an UPDATE statement, because
Flink relies on Dynamic Tables and Continuous Queries. [1]

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/concepts/dynamic_tables/

On Wed, Sep 21, 2022 at 5:40 PM Xuyang <xy...@163.com> wrote:

> *Hi, You're right, there's no keyword 'update' in flink.*
>
>
>
>
>
> --
>     Best!
>     Xuyang
>
>
> 在 2022-09-21 22:40:03,podunk@gmx.com 写道:
>
> Thank you - I'll try.
> There is no 'UPDATE' clause in Flink SQL?
>
>
> *Sent:* Monday, September 19, 2022 at 4:09 AM
> *From:* "Shengkai Fang" <fs...@gmail.com>
> *To:* podunk@gmx.com
> *Cc:* user@flink.apache.org
> *Subject:* Re: INSERT INTO will work faster in Flink than in regular
> database?
> Hi. I think you can write a udf[1] to process some fields and then insert
> into the sink.
>
> Best.
> Shengkai
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
>
> <po...@gmx.com> 于2022年9月15日周四 22:10写道:
>
>> What's the most effective way (performance) to update big no of rows?
>> Sure this will be probably "INSERT INTO table (column1) SELECT column1
>> FROM ...". Anyway, I do not see any "UPDATE" in Flink?
>> But sometimes SQL is not enough.
>> Suppose I have code:
>>
>> TableResult tableResult1 = tEnv.executeSql("SELECT * FROM SomeTable");
>> try (org.apache.flink.util.CloseableIterator<Row> it =
>> tableResult1.collect()) {
>>         while(it.hasNext()) {
>>                 Row row = it.next();
>>                 //Treat row:
>>                 String x_field = row.getField("some_column").toString();
>>                 //Do something with x_field
>>                 ...
>>                 tEnv.executeSql("INSERT INTO AnotherTable (column) VALUES
>> ('new_value')");
>>         }
>> }
>>
>> But this INSERT will be probably performance killer...
>>
>> Any suggestion how to do it in a smart way?
>>
>> Mike
>
>

Re:Re: INSERT INTO will work faster in Flink than in regular database?

Posted by Xuyang <xy...@163.com>.
Hi, You're right, there's no keyword 'update' in flink.













--

    Best!
    Xuyang




在 2022-09-21 22:40:03,podunk@gmx.com 写道:

Thank you - I'll try.
There is no 'UPDATE' clause in Flink SQL?
 
 
Sent: Monday, September 19, 2022 at 4:09 AM
From: "Shengkai Fang" <fs...@gmail.com>
To: podunk@gmx.com
Cc: user@flink.apache.org
Subject: Re: INSERT INTO will work faster in Flink than in regular database?
Hi. I think you can write a udf[1] to process some fields and then insert into the sink. 
 
Best.
Shengkai
 
[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
 
<po...@gmx.com> 于2022年9月15日周四 22:10写道:
What's the most effective way (performance) to update big no of rows?
Sure this will be probably "INSERT INTO table (column1) SELECT column1 FROM ...". Anyway, I do not see any "UPDATE" in Flink?
But sometimes SQL is not enough.
Suppose I have code:
 
TableResult tableResult1 = tEnv.executeSql("SELECT * FROM SomeTable");
try (org.apache.flink.util.CloseableIterator<Row> it = tableResult1.collect()) {
        while(it.hasNext()) {
                Row row = it.next();
                //Treat row:
                String x_field = row.getField("some_column").toString();
                //Do something with x_field
                ...
                tEnv.executeSql("INSERT INTO AnotherTable (column) VALUES ('new_value')");
        }
}

But this INSERT will be probably performance killer...

Any suggestion how to do it in a smart way?

Mike

Re: INSERT INTO will work faster in Flink than in regular database?

Posted by po...@gmx.com.
Thank you - I'll try.

There is no 'UPDATE' clause in Flink SQL?





**Sent:**  Monday, September 19, 2022 at 4:09 AM  
**From:**  "Shengkai Fang" <fs...@gmail.com>  
**To:**  podunk@gmx.com  
**Cc:**  user@flink.apache.org  
**Subject:**  Re: INSERT INTO will work faster in Flink than in regular
database?

Hi. I think you can write a udf[1] to process some fields and then insert into
the sink.



Best.

Shengkai



[1]<https://nightlies.apache.org/flink/flink-docs-
master/docs/dev/table/functions/udfs/>



<[podunk@gmx.com](mailto:podunk@gmx.com)> 于2022年9月15日周四 22:10写道:

> What's the most effective way (performance) to update big no of rows?  
>  Sure this will be probably "INSERT INTO table (column1) SELECT column1 FROM
> ...". Anyway, I do not see any "UPDATE" in Flink?  
>  But sometimes SQL is not enough.  
>  Suppose I have code:  
>  
>  TableResult tableResult1 = tEnv.executeSql("SELECT * FROM SomeTable");  
>  try (org.apache.flink.util.CloseableIterator<Row> it =
> tableResult1.collect()) {  
>          while(it.hasNext()) {  
>                  Row row = it.next();  
>                  //Treat row:  
>                  String x_field = row.getField("some_column").toString();  
>                  //Do something with x_field  
>                  ...  
>                  tEnv.executeSql("INSERT INTO AnotherTable (column) VALUES
> ('new_value')");  
>          }  
>  }  
>  
>  But this INSERT will be probably performance killer...  
>  
>  Any suggestion how to do it in a smart way?  
>  
>  Mike


Re: INSERT INTO will work faster in Flink than in regular database?

Posted by Shengkai Fang <fs...@gmail.com>.
Hi. I think you can write a udf[1] to process some fields and then insert
into the sink.

Best.
Shengkai

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/

<po...@gmx.com> 于2022年9月15日周四 22:10写道:

> What's the most effective way (performance) to update big no of rows?
> Sure this will be probably "INSERT INTO table (column1) SELECT column1
> FROM ...". Anyway, I do not see any "UPDATE" in Flink?
> But sometimes SQL is not enough.
> Suppose I have code:
>
> TableResult tableResult1 = tEnv.executeSql("SELECT * FROM SomeTable");
> try (org.apache.flink.util.CloseableIterator<Row> it =
> tableResult1.collect()) {
>         while(it.hasNext()) {
>                 Row row = it.next();
>                 //Treat row:
>                 String x_field = row.getField("some_column").toString();
>                 //Do something with x_field
>                 ...
>                 tEnv.executeSql("INSERT INTO AnotherTable (column) VALUES
> ('new_value')");
>         }
> }
>
> But this INSERT will be probably performance killer...
>
> Any suggestion how to do it in a smart way?
>
> Mike
>