You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Tamas Kiss <ta...@cloudera.com.INVALID> on 2022/08/04 13:44:01 UTC

Reworking use of TableEnvironment due to API change in 1.15

Hi Experts,

We are trying to upgrade to flink-1.1.5 and have to refactor our code due
to the following method has been removed from TableEnvironment: void
sqlUpdate(String sql). In our code we mix the SQL and DataStream API and
use the sqlUpdate method to buffer several insert statements, also add
other inserts by getting and configuring retractStream from the same
TableEnvironment object. We then call tableEnv.executeAsync() in a later
step and get all the previous inserts working in one Flink job.

This FLIP
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878>
suggests
to use the following pattern instead of using sqlUpdate

createStatementSet() -> StatementSet -> execute()

This basically works for the inserts defined via sqlUpdate but the others
managed through DataStream API do not get called.

I was considering the following workarounds but all of them have issues:
1. Replacing sqlUpdate with executeSql does not work because this submits
the job immediately and does not let create multiple inserts.
2. Replacing calls to DataStream API with SQL API. This can work but
requires further investigation and seems to be a lot of work at the moment.
3. Keeping both and calling statementSet.execute() and
tableEnv.executeAsync() leads to have two separate jobs that would be hard
to manage.

So at the moment the only way I can think of is to use StatementSet and mix
it with calls to the DataStream API. Is there any solution to make this
scenario work and have only Flink job executed?

Thanks
Tamas

Re: Reworking use of TableEnvironment due to API change in 1.15

Posted by Jark Wu <im...@gmail.com>.
Hi Tamas,

I think `StreamStatementSet.attachAsDataStream()` is what you are looking
for.
Here is detailed documentation. Please take a look:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#adding-table-api-pipelines-to-datastream-api

Best,
Jark


On Fri, 5 Aug 2022 at 10:06, Tamas Kiss <ta...@cloudera.com.invalid> wrote:

> Hi Experts,
>
> We are trying to upgrade to flink-1.1.5 and have to refactor our code due
> to the following method has been removed from TableEnvironment: void
> sqlUpdate(String sql). In our code we mix the SQL and DataStream API and
> use the sqlUpdate method to buffer several insert statements, also add
> other inserts by getting and configuring retractStream from the same
> TableEnvironment object. We then call tableEnv.executeAsync() in a later
> step and get all the previous inserts working in one Flink job.
>
> This FLIP
> <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >
> suggests
> to use the following pattern instead of using sqlUpdate
>
> createStatementSet() -> StatementSet -> execute()
>
> This basically works for the inserts defined via sqlUpdate but the others
> managed through DataStream API do not get called.
>
> I was considering the following workarounds but all of them have issues:
> 1. Replacing sqlUpdate with executeSql does not work because this submits
> the job immediately and does not let create multiple inserts.
> 2. Replacing calls to DataStream API with SQL API. This can work but
> requires further investigation and seems to be a lot of work at the moment.
> 3. Keeping both and calling statementSet.execute() and
> tableEnv.executeAsync() leads to have two separate jobs that would be hard
> to manage.
>
> So at the moment the only way I can think of is to use StatementSet and mix
> it with calls to the DataStream API. Is there any solution to make this
> scenario work and have only Flink job executed?
>
> Thanks
> Tamas
>

Re: Reworking use of TableEnvironment due to API change in 1.15

Posted by Tamas Kiss <ta...@cloudera.com.INVALID>.
Hi,

one correction on the above, both cases when I mentioned
tableEnv.executeAsync I was
referring to StreamExecutionEnvironment's executeAsync, sorry for the typo.

Any thoughts on the issue?

Thanks
Tamas

On Thu, Aug 4, 2022 at 3:44 PM Tamas Kiss <ta...@cloudera.com> wrote:

> Hi Experts,
>
> We are trying to upgrade to flink-1.1.5 and have to refactor our code due
> to the following method has been removed from TableEnvironment: void
> sqlUpdate(String sql). In our code we mix the SQL and DataStream API and
> use the sqlUpdate method to buffer several insert statements, also add
> other inserts by getting and configuring retractStream from the same
> TableEnvironment object. We then call tableEnv.executeAsync() in a later
> step and get all the previous inserts working in one Flink job.
>
> This FLIP
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878> suggests
> to use the following pattern instead of using sqlUpdate
>
> createStatementSet() -> StatementSet -> execute()
>
> This basically works for the inserts defined via sqlUpdate but the others
> managed through DataStream API do not get called.
>
> I was considering the following workarounds but all of them have issues:
> 1. Replacing sqlUpdate with executeSql does not work because this submits
> the job immediately and does not let create multiple inserts.
> 2. Replacing calls to DataStream API with SQL API. This can work but
> requires further investigation and seems to be a lot of work at the moment.
> 3. Keeping both and calling statementSet.execute() and
> tableEnv.executeAsync() leads to have two separate jobs that would be hard
> to manage.
>
> So at the moment the only way I can think of is to use StatementSet and
> mix it with calls to the DataStream API. Is there any solution to make this
> scenario work and have only Flink job executed?
>
> Thanks
> Tamas
>
>