You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by izual <iz...@163.com> on 2020/11/02 03:28:10 UTC

How to use both of SQL and DataStream in 1.11

Hi, community:




  We used flink 1.9.1, both SQL and DataStream API to support multiple sinks for product envs.

  For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and table.toAppendStream[Row].addSink(new RichSinkFunction[Row] {...}).name("dest2"), and env.execute() to submit the DAG together, and result will sink to dest1 or dest2 or both.




  Now I try to update flink to 1.11.2, according to [1]-Attention,use tableEnv.execute() instead of env.execute(), but only get the result of `sqlUpdate`, and result of `DataStream.addSink` is missed.




  1. How to get both the results in mixed SQL/DataStream use cases, maybe change all RichSinkFunction into a UpsertTable works. Is there another simple way to do this?

  2. It seems like env.getExecutionPlan only returns DAG of DataStream API now, so how to get the whole DAG like env.getExecutionPlan() in 1.9.1.




  Thanks for ur reply.

Re: Re: How to use both of SQL and DataStream in 1.11

Posted by Danny Chan <da...@apache.org>.
You can still use the .sqlQuery(...) to create a common table there, then
converts the table into a DataStream,

with this DataStream, you can add the multiple sink functions you like.

izual <iz...@163.com> 于2020年11月2日周一 下午5:18写道:

> Hi, Danny:
>
>
> Thanks for your help.
>
>
> As in the question, some result was saved using DataStream API:
>
>
> ```
>
> table.toAppendStream[Row].addSink(new MyStreamSink)
>
>
> class MyStreamSink extends RichSinkFunction[Row] {
>
> override def invoke(r: Row): Unit = {
>
> // save result
>
> }
>
> }
>
> ```
>
>
> So if use `StatementSet.addInsert`, should I must use
> UpsertStreamTableSink and StreamTableSourceFactory to wrap the
> RichSinkFunction?
>
> Is there a way to keep using DataStream API in table environment? which is
> more expressive.
>
>
>
>
>
>
> At 2020-11-02 16:53:22, "Danny Chan" <da...@apache.org> wrote:
>
> You can still convert the datastream to table and register it with method
>
> void TableEnvironment.createTemporaryView(String path, Table view)
>
> Then create a StatementSet with
>
> StatementSet TableEnvironment.createStatementSet(),
>
> With the StatementSet, you can execute multiple insert statements
> altogether,
> and then submit the job with
>
> TableResult StatementSet.execute()
>
> izual <iz...@163.com> 于2020年11月2日周一 上午11:28写道:
>
>> Hi, community:
>>
>>
>>   We used flink 1.9.1, both SQL and DataStream API to support multiple
>> sinks for product envs.
>>
>>   For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and
>> table.toAppendStream[Row].addSink(new RichSinkFunction[Row]
>> {...}).name("dest2"), and env.execute() to submit the DAG together, and
>> result will sink to dest1 or dest2 or both.
>>
>>
>>   Now I try to update flink to 1.11.2, according to [1]-Attention,use
>> tableEnv.execute() instead of env.execute(), but only get the result of
>> `sqlUpdate`, and result of `DataStream.addSink` is missed.
>>
>>
>>   1. How to get both the results in mixed SQL/DataStream use cases, maybe
>> change all RichSinkFunction into a UpsertTable works. Is there another
>> simple way to do this?
>>
>>   2. It seems like env.getExecutionPlan only returns DAG of DataStream
>> API now, so how to get the whole DAG like env.getExecutionPlan() in 1.9.1.
>>
>>
>>   Thanks for ur reply.
>>
>>
>>
>>
>
>
>
>

Re:Re: How to use both of SQL and DataStream in 1.11

Posted by izual <iz...@163.com>.
Hi, Danny:




Thanks for your help.




As in the question, some result was saved using DataStream API:




```

table.toAppendStream[Row].addSink(new MyStreamSink)




class MyStreamSink extends RichSinkFunction[Row] {

override def invoke(r: Row): Unit = {

// save result

}

}

```




So if use `StatementSet.addInsert`, should I must use UpsertStreamTableSink and StreamTableSourceFactory to wrap the RichSinkFunction?

Is there a way to keep using DataStream API in table environment? which is more expressive.
















At 2020-11-02 16:53:22, "Danny Chan" <da...@apache.org> wrote:

You can still convert the datastream to table and register it with method

void TableEnvironment.createTemporaryView(String path, Table view)

Then create a StatementSet with

StatementSet TableEnvironment.createStatementSet(),

With the StatementSet, you can execute multiple insert statements altogether,
and then submit the job with

TableResult StatementSet.execute()



izual <iz...@163.com> 于2020年11月2日周一 上午11:28写道:


Hi, community:




  We used flink 1.9.1, both SQL and DataStream API to support multiple sinks for product envs.

  For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and table.toAppendStream[Row].addSink(new RichSinkFunction[Row] {...}).name("dest2"), and env.execute() to submit the DAG together, and result will sink to dest1 or dest2 or both.




  Now I try to update flink to 1.11.2, according to [1]-Attention,use tableEnv.execute() instead of env.execute(), but only get the result of `sqlUpdate`, and result of `DataStream.addSink` is missed.




  1. How to get both the results in mixed SQL/DataStream use cases, maybe change all RichSinkFunction into a UpsertTable works. Is there another simple way to do this?

  2. It seems like env.getExecutionPlan only returns DAG of DataStream API now, so how to get the whole DAG like env.getExecutionPlan() in 1.9.1.




  Thanks for ur reply.





 

Re: How to use both of SQL and DataStream in 1.11

Posted by Danny Chan <da...@apache.org>.
You can still convert the datastream to table and register it with method

void TableEnvironment.createTemporaryView(String path, Table view)

Then create a StatementSet with

StatementSet TableEnvironment.createStatementSet(),

With the StatementSet, you can execute multiple insert statements
altogether,
and then submit the job with

TableResult StatementSet.execute()

izual <iz...@163.com> 于2020年11月2日周一 上午11:28写道:

> Hi, community:
>
>
>   We used flink 1.9.1, both SQL and DataStream API to support multiple
> sinks for product envs.
>
>   For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and
> table.toAppendStream[Row].addSink(new RichSinkFunction[Row]
> {...}).name("dest2"), and env.execute() to submit the DAG together, and
> result will sink to dest1 or dest2 or both.
>
>
>   Now I try to update flink to 1.11.2, according to [1]-Attention,use
> tableEnv.execute() instead of env.execute(), but only get the result of
> `sqlUpdate`, and result of `DataStream.addSink` is missed.
>
>
>   1. How to get both the results in mixed SQL/DataStream use cases, maybe
> change all RichSinkFunction into a UpsertTable works. Is there another
> simple way to do this?
>
>   2. It seems like env.getExecutionPlan only returns DAG of DataStream API
> now, so how to get the whole DAG like env.getExecutionPlan() in 1.9.1.
>
>
>   Thanks for ur reply.
>
>
>
>