You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Tipper <jo...@hotmail.com> on 2022/04/29 13:07:40 UTC
Multiple INSERT INTO within single PyFlink job?
Hi all,
Is it possible to have more than one `INSERT INTO ... SELECT ...` statement within a single PyFlink job (on Flink 1.13.6)?
I have a number of output tables that I create and I am trying to write to write to these within a single job, where the example SQL looks like (assume there is an input table called 'input'):
sql1 = "INSERT INTO out1 (col1, col2) SELECT col1, col2 FROM input"
sql2 = "INSERT INTO out2 (col3, col4) SELECT col3, col4 FROM input"
env.execute_sql(sql1)
env.execute_sql(sql2)
When this is run inside a Flink cluster inside Kinesis on AWS, I get a failure: "Cannot have more than one execute() or executeAsync() call in a single environment". When I look at the Flink web UI, I can see that there is one job called "insert-into_default_catalog.default_database.out1". Does Flink separate out each INSERT statement into a separate job? It looks like it tries to create one job for the first query and then fails to create a second job for the second query.
Is there any way of getting it to run as a single job using SQL, without having to move away from SQL and the Table API?
Many thanks,
John
Re: Multiple INSERT INTO within single PyFlink job?
Posted by John Tipper <jo...@hotmail.com>.
Ah, found it: I need to use add_insert_sql() in order to use multiple insert statements:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/insert/#insert-statement
https://aws.amazon.com/blogs/big-data/build-a-real-time-streaming-application-using-apache-flink-python-api-with-amazon-kinesis-data-analytics/
Sorry for the noise.
________________________________
From: John Tipper <jo...@hotmail.com>
Sent: 29 April 2022 14:07
To: user@flink.apache.org <us...@flink.apache.org>
Subject: Multiple INSERT INTO within single PyFlink job?
Hi all,
Is it possible to have more than one `INSERT INTO ... SELECT ...` statement within a single PyFlink job (on Flink 1.13.6)?
I have a number of output tables that I create and I am trying to write to write to these within a single job, where the example SQL looks like (assume there is an input table called 'input'):
sql1 = "INSERT INTO out1 (col1, col2) SELECT col1, col2 FROM input"
sql2 = "INSERT INTO out2 (col3, col4) SELECT col3, col4 FROM input"
env.execute_sql(sql1)
env.execute_sql(sql2)
When this is run inside a Flink cluster inside Kinesis on AWS, I get a failure: "Cannot have more than one execute() or executeAsync() call in a single environment". When I look at the Flink web UI, I can see that there is one job called "insert-into_default_catalog.default_database.out1". Does Flink separate out each INSERT statement into a separate job? It looks like it tries to create one job for the first query and then fails to create a second job for the second query.
Is there any way of getting it to run as a single job using SQL, without having to move away from SQL and the Table API?
Many thanks,
John