You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 夏帅 <jk...@dingtalk.com.INVALID> on 2020/07/08 07:40:41 UTC
回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
你好,
可以看看你的代码结构是不是以下这种
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
......
tableEnv.execute("")
如果是的话,可以尝试使用bsEnv.execute("")
1.11对于两者的execute代码实现有改动
------------------------------------------------------------------
发件人:Zhou Zach <wa...@163.com>
发送时间:2020年7月8日(星期三) 15:30
收件人:Flink user-zh mailing list <us...@flink.apache.org>
主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology
代码在flink 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.
at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
query:
streamTableEnv.executeSql(
"""
|
|CREATE TABLE `user` (
| uid BIGINT,
| sex VARCHAR,
| age INT,
| created_time TIMESTAMP(3),
| WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
|) WITH (
| 'connector.type' = 'kafka',
| 'connector.version' = 'universal',
| -- 'connector.topic' = 'user',
| 'connector.topic' = 'user_long',
| 'connector.startup-mode' = 'latest-offset',
| 'connector.properties.group.id' = 'user_flink',
| 'format.type' = 'json',
| 'format.derive-schema' = 'true'
|)
|""".stripMargin)
streamTableEnv.executeSql(
"""
|
|CREATE TABLE user_hbase3(
| rowkey BIGINT,
| cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
|) WITH (
| 'connector.type' = 'hbase',
| 'connector.version' = '2.1.0',
| 'connector.table-name' = 'user_hbase2',
| 'connector.zookeeper.znode.parent' = '/hbase',
| 'connector.write.buffer-flush.max-size' = '10mb',
| 'connector.write.buffer-flush.max-rows' = '1000',
| 'connector.write.buffer-flush.interval' = '2s'
|)
|""".stripMargin)
streamTableEnv.executeSql(
"""
|
|insert into user_hbase3
|SELECT uid,
|
| ROW(sex, age, created_time ) as cf
| FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from `user`)
|
|""".stripMargin)
Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Posted by WeiXubin <18...@163.com>.
感谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Posted by godfrey he <go...@gmail.com>.
hi,
目前没有解决办法,insert job根据sink表名自动生成job name。
后续解法关注 https://issues.apache.org/jira/browse/FLINK-18545
Weixubin <18...@163.com> 于2020年7月23日周四 下午6:07写道:
> Hi,
> 我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。
> 而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。
> 请问有什么解决的方法吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-08 16:07:17,"Jingsong Li" <ji...@gmail.com> 写道:
> >Hi,
> >
> >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
> >
> >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
> >并没有真正的物理节点。你不用再调用了。
> >
> >Best,
> >Jingsong
> >
> >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach <wa...@163.com> wrote:
> >
> >>
> >>
> >>
> >> 代码结构改成这样的了:
> >>
> >>
> >>
> >>
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> >> val blinkEnvSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>
> >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> >> blinkEnvSettings)
> >>
> >>
> >>
> >>
> >>
> >> streamExecutionEnv.execute("from kafka sink hbase")
> >>
> >>
> >>
> >>
> >> 还是报一样的错
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-08 15:40:41,"夏帅" <jk...@dingtalk.com.INVALID> 写道:
> >> >你好,
> >> >可以看看你的代码结构是不是以下这种
> >> > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> >> > val bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
> >> > val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> >> > ......
> >> > tableEnv.execute("")
> >> >如果是的话,可以尝试使用bsEnv.execute("")
> >> >1.11对于两者的execute代码实现有改动
> >> >
> >> >
> >> >------------------------------------------------------------------
> >> >发件人:Zhou Zach <wa...@163.com>
> >> >发送时间:2020年7月8日(星期三) 15:30
> >> >收件人:Flink user-zh mailing list <us...@flink.apache.org>
> >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming
> topology
> >> >
> >> >代码在flink
> >>
> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
> >> >Exception in thread "main" java.lang.IllegalStateException: No
> operators
> >> defined in streaming topology. Cannot generate StreamGraph.
> >> >at
> >>
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> >> >at
> >>
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> >> >at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
> >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
> >> >
> >> >
> >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
> >> >
> >> >
> >> >
> >> >
> >> >query:
> >> >streamTableEnv.executeSql(
> >> > """
> >> > |
> >> > |CREATE TABLE `user` (
> >> > | uid BIGINT,
> >> > | sex VARCHAR,
> >> > | age INT,
> >> > | created_time TIMESTAMP(3),
> >> > | WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> > |) WITH (
> >> > | 'connector.type' = 'kafka',
> >> > | 'connector.version' = 'universal',
> >> > | -- 'connector.topic' = 'user',
> >> > | 'connector.topic' = 'user_long',
> >> > | 'connector.startup-mode' = 'latest-offset',
> >> > | 'connector.properties.group.id' = 'user_flink',
> >> > | 'format.type' = 'json',
> >> > | 'format.derive-schema' = 'true'
> >> > |)
> >> > |""".stripMargin)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > streamTableEnv.executeSql(
> >> > """
> >> > |
> >> > |CREATE TABLE user_hbase3(
> >> > | rowkey BIGINT,
> >> > | cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
> >> > |) WITH (
> >> > | 'connector.type' = 'hbase',
> >> > | 'connector.version' = '2.1.0',
> >> > | 'connector.table-name' = 'user_hbase2',
> >> > | 'connector.zookeeper.znode.parent' = '/hbase',
> >> > | 'connector.write.buffer-flush.max-size' = '10mb',
> >> > | 'connector.write.buffer-flush.max-rows' = '1000',
> >> > | 'connector.write.buffer-flush.interval' = '2s'
> >> > |)
> >> > |""".stripMargin)
> >> >
> >> >
> >> > streamTableEnv.executeSql(
> >> > """
> >> > |
> >> > |insert into user_hbase3
> >> > |SELECT uid,
> >> > |
> >> > | ROW(sex, age, created_time ) as cf
> >> > | FROM (select uid,sex,age, cast(created_time as VARCHAR) as
> >> created_time from `user`)
> >> > |
> >> > |""".stripMargin)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>
Re:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Posted by Weixubin <18...@163.com>.
Hi,
我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。
而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。
请问有什么解决的方法吗?
在 2020-07-08 16:07:17,"Jingsong Li" <ji...@gmail.com> 写道:
>Hi,
>
>你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
>
>所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
>并没有真正的物理节点。你不用再调用了。
>
>Best,
>Jingsong
>
>On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach <wa...@163.com> wrote:
>
>>
>>
>>
>> 代码结构改成这样的了:
>>
>>
>>
>>
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>
>> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
>> blinkEnvSettings)
>>
>>
>>
>>
>>
>> streamExecutionEnv.execute("from kafka sink hbase")
>>
>>
>>
>>
>> 还是报一样的错
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-08 15:40:41,"夏帅" <jk...@dingtalk.com.INVALID> 写道:
>> >你好,
>> >可以看看你的代码结构是不是以下这种
>> > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> > val bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
>> > val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>> > ......
>> > tableEnv.execute("")
>> >如果是的话,可以尝试使用bsEnv.execute("")
>> >1.11对于两者的execute代码实现有改动
>> >
>> >
>> >------------------------------------------------------------------
>> >发件人:Zhou Zach <wa...@163.com>
>> >发送时间:2020年7月8日(星期三) 15:30
>> >收件人:Flink user-zh mailing list <us...@flink.apache.org>
>> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology
>> >
>> >代码在flink
>> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
>> >Exception in thread "main" java.lang.IllegalStateException: No operators
>> defined in streaming topology. Cannot generate StreamGraph.
>> >at
>> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
>> >at
>> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
>> >at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
>> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
>> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
>> >
>> >
>> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
>> >
>> >
>> >
>> >
>> >query:
>> >streamTableEnv.executeSql(
>> > """
>> > |
>> > |CREATE TABLE `user` (
>> > | uid BIGINT,
>> > | sex VARCHAR,
>> > | age INT,
>> > | created_time TIMESTAMP(3),
>> > | WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> > |) WITH (
>> > | 'connector.type' = 'kafka',
>> > | 'connector.version' = 'universal',
>> > | -- 'connector.topic' = 'user',
>> > | 'connector.topic' = 'user_long',
>> > | 'connector.startup-mode' = 'latest-offset',
>> > | 'connector.properties.group.id' = 'user_flink',
>> > | 'format.type' = 'json',
>> > | 'format.derive-schema' = 'true'
>> > |)
>> > |""".stripMargin)
>> >
>> >
>> >
>> >
>> >
>> >
>> > streamTableEnv.executeSql(
>> > """
>> > |
>> > |CREATE TABLE user_hbase3(
>> > | rowkey BIGINT,
>> > | cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>> > |) WITH (
>> > | 'connector.type' = 'hbase',
>> > | 'connector.version' = '2.1.0',
>> > | 'connector.table-name' = 'user_hbase2',
>> > | 'connector.zookeeper.znode.parent' = '/hbase',
>> > | 'connector.write.buffer-flush.max-size' = '10mb',
>> > | 'connector.write.buffer-flush.max-rows' = '1000',
>> > | 'connector.write.buffer-flush.interval' = '2s'
>> > |)
>> > |""".stripMargin)
>> >
>> >
>> > streamTableEnv.executeSql(
>> > """
>> > |
>> > |insert into user_hbase3
>> > |SELECT uid,
>> > |
>> > | ROW(sex, age, created_time ) as cf
>> > | FROM (select uid,sex,age, cast(created_time as VARCHAR) as
>> created_time from `user`)
>> > |
>> > |""".stripMargin)
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>
>
>--
>Best, Jingsong Lee
Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Posted by godfrey he <go...@gmail.com>.
这个问题的已经有一个issue:https://issues.apache.org/jira/browse/FLINK-18545,请关注
WeiXubin <18...@163.com> 于2020年7月23日周四 下午6:00写道:
> Hi,
> 我想请问下使用 streamExecutionEnv.execute("from kafka sink
> hbase"),通过这种方式可以给Job指定名称。
> 而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。
> 请问有什么解决方案吗?谢谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Posted by WeiXubin <18...@163.com>.
Hi,
我想请问下使用 streamExecutionEnv.execute("from kafka sink
hbase"),通过这种方式可以给Job指定名称。
而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。
请问有什么解决方案吗?谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
回复:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Posted by 夏帅 <jk...@dingtalk.com.INVALID>.
感谢
Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Posted by godfrey he <go...@gmail.com>.
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()
详细可以参考 [1] [2]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset
Best,
Godfrey
Zhou Zach <wa...@163.com> 于2020年7月8日周三 下午4:19写道:
> 去掉就好了,感谢解答
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-08 16:07:17,"Jingsong Li" <ji...@gmail.com> 写道:
> >Hi,
> >
> >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
> >
> >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
> >并没有真正的物理节点。你不用再调用了。
> >
> >Best,
> >Jingsong
> >
> >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach <wa...@163.com> wrote:
> >
> >>
> >>
> >>
> >> 代码结构改成这样的了:
> >>
> >>
> >>
> >>
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> >> val blinkEnvSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>
> >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> >> blinkEnvSettings)
> >>
> >>
> >>
> >>
> >>
> >> streamExecutionEnv.execute("from kafka sink hbase")
> >>
> >>
> >>
> >>
> >> 还是报一样的错
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-08 15:40:41,"夏帅" <jk...@dingtalk.com.INVALID> 写道:
> >> >你好,
> >> >可以看看你的代码结构是不是以下这种
> >> > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> >> > val bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
> >> > val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> >> > ......
> >> > tableEnv.execute("")
> >> >如果是的话,可以尝试使用bsEnv.execute("")
> >> >1.11对于两者的execute代码实现有改动
> >> >
> >> >
> >> >------------------------------------------------------------------
> >> >发件人:Zhou Zach <wa...@163.com>
> >> >发送时间:2020年7月8日(星期三) 15:30
> >> >收件人:Flink user-zh mailing list <us...@flink.apache.org>
> >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming
> topology
> >> >
> >> >代码在flink
> >>
> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
> >> >Exception in thread "main" java.lang.IllegalStateException: No
> operators
> >> defined in streaming topology. Cannot generate StreamGraph.
> >> >at
> >>
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> >> >at
> >>
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> >> >at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
> >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
> >> >
> >> >
> >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
> >> >
> >> >
> >> >
> >> >
> >> >query:
> >> >streamTableEnv.executeSql(
> >> > """
> >> > |
> >> > |CREATE TABLE `user` (
> >> > | uid BIGINT,
> >> > | sex VARCHAR,
> >> > | age INT,
> >> > | created_time TIMESTAMP(3),
> >> > | WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> > |) WITH (
> >> > | 'connector.type' = 'kafka',
> >> > | 'connector.version' = 'universal',
> >> > | -- 'connector.topic' = 'user',
> >> > | 'connector.topic' = 'user_long',
> >> > | 'connector.startup-mode' = 'latest-offset',
> >> > | 'connector.properties.group.id' = 'user_flink',
> >> > | 'format.type' = 'json',
> >> > | 'format.derive-schema' = 'true'
> >> > |)
> >> > |""".stripMargin)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > streamTableEnv.executeSql(
> >> > """
> >> > |
> >> > |CREATE TABLE user_hbase3(
> >> > | rowkey BIGINT,
> >> > | cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
> >> > |) WITH (
> >> > | 'connector.type' = 'hbase',
> >> > | 'connector.version' = '2.1.0',
> >> > | 'connector.table-name' = 'user_hbase2',
> >> > | 'connector.zookeeper.znode.parent' = '/hbase',
> >> > | 'connector.write.buffer-flush.max-size' = '10mb',
> >> > | 'connector.write.buffer-flush.max-rows' = '1000',
> >> > | 'connector.write.buffer-flush.interval' = '2s'
> >> > |)
> >> > |""".stripMargin)
> >> >
> >> >
> >> > streamTableEnv.executeSql(
> >> > """
> >> > |
> >> > |insert into user_hbase3
> >> > |SELECT uid,
> >> > |
> >> > | ROW(sex, age, created_time ) as cf
> >> > | FROM (select uid,sex,age, cast(created_time as VARCHAR) as
> >> created_time from `user`)
> >> > |
> >> > |""".stripMargin)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>
Re:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Posted by Zhou Zach <wa...@163.com>.
去掉就好了,感谢解答
在 2020-07-08 16:07:17,"Jingsong Li" <ji...@gmail.com> 写道:
>Hi,
>
>你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
>
>所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
>并没有真正的物理节点。你不用再调用了。
>
>Best,
>Jingsong
>
>On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach <wa...@163.com> wrote:
>
>>
>>
>>
>> 代码结构改成这样的了:
>>
>>
>>
>>
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>
>> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
>> blinkEnvSettings)
>>
>>
>>
>>
>>
>> streamExecutionEnv.execute("from kafka sink hbase")
>>
>>
>>
>>
>> 还是报一样的错
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-08 15:40:41,"夏帅" <jk...@dingtalk.com.INVALID> 写道:
>> >你好,
>> >可以看看你的代码结构是不是以下这种
>> > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> > val bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
>> > val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>> > ......
>> > tableEnv.execute("")
>> >如果是的话,可以尝试使用bsEnv.execute("")
>> >1.11对于两者的execute代码实现有改动
>> >
>> >
>> >------------------------------------------------------------------
>> >发件人:Zhou Zach <wa...@163.com>
>> >发送时间:2020年7月8日(星期三) 15:30
>> >收件人:Flink user-zh mailing list <us...@flink.apache.org>
>> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology
>> >
>> >代码在flink
>> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
>> >Exception in thread "main" java.lang.IllegalStateException: No operators
>> defined in streaming topology. Cannot generate StreamGraph.
>> >at
>> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
>> >at
>> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
>> >at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
>> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
>> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
>> >
>> >
>> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
>> >
>> >
>> >
>> >
>> >query:
>> >streamTableEnv.executeSql(
>> > """
>> > |
>> > |CREATE TABLE `user` (
>> > | uid BIGINT,
>> > | sex VARCHAR,
>> > | age INT,
>> > | created_time TIMESTAMP(3),
>> > | WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> > |) WITH (
>> > | 'connector.type' = 'kafka',
>> > | 'connector.version' = 'universal',
>> > | -- 'connector.topic' = 'user',
>> > | 'connector.topic' = 'user_long',
>> > | 'connector.startup-mode' = 'latest-offset',
>> > | 'connector.properties.group.id' = 'user_flink',
>> > | 'format.type' = 'json',
>> > | 'format.derive-schema' = 'true'
>> > |)
>> > |""".stripMargin)
>> >
>> >
>> >
>> >
>> >
>> >
>> > streamTableEnv.executeSql(
>> > """
>> > |
>> > |CREATE TABLE user_hbase3(
>> > | rowkey BIGINT,
>> > | cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>> > |) WITH (
>> > | 'connector.type' = 'hbase',
>> > | 'connector.version' = '2.1.0',
>> > | 'connector.table-name' = 'user_hbase2',
>> > | 'connector.zookeeper.znode.parent' = '/hbase',
>> > | 'connector.write.buffer-flush.max-size' = '10mb',
>> > | 'connector.write.buffer-flush.max-rows' = '1000',
>> > | 'connector.write.buffer-flush.interval' = '2s'
>> > |)
>> > |""".stripMargin)
>> >
>> >
>> > streamTableEnv.executeSql(
>> > """
>> > |
>> > |insert into user_hbase3
>> > |SELECT uid,
>> > |
>> > | ROW(sex, age, created_time ) as cf
>> > | FROM (select uid,sex,age, cast(created_time as VARCHAR) as
>> created_time from `user`)
>> > |
>> > |""".stripMargin)
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>
>
>--
>Best, Jingsong Lee
Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Posted by Jingsong Li <ji...@gmail.com>.
Hi,
你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
并没有真正的物理节点。你不用再调用了。
Best,
Jingsong
On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach <wa...@163.com> wrote:
>
>
>
> 代码结构改成这样的了:
>
>
>
>
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>
> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> blinkEnvSettings)
>
>
>
>
>
> streamExecutionEnv.execute("from kafka sink hbase")
>
>
>
>
> 还是报一样的错
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-08 15:40:41,"夏帅" <jk...@dingtalk.com.INVALID> 写道:
> >你好,
> >可以看看你的代码结构是不是以下这种
> > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> > val bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
> > val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> > ......
> > tableEnv.execute("")
> >如果是的话,可以尝试使用bsEnv.execute("")
> >1.11对于两者的execute代码实现有改动
> >
> >
> >------------------------------------------------------------------
> >发件人:Zhou Zach <wa...@163.com>
> >发送时间:2020年7月8日(星期三) 15:30
> >收件人:Flink user-zh mailing list <us...@flink.apache.org>
> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology
> >
> >代码在flink
> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
> >Exception in thread "main" java.lang.IllegalStateException: No operators
> defined in streaming topology. Cannot generate StreamGraph.
> >at
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> >at
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> >at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
> >
> >
> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
> >
> >
> >
> >
> >query:
> >streamTableEnv.executeSql(
> > """
> > |
> > |CREATE TABLE `user` (
> > | uid BIGINT,
> > | sex VARCHAR,
> > | age INT,
> > | created_time TIMESTAMP(3),
> > | WATERMARK FOR created_time as created_time - INTERVAL '3'
> SECOND
> > |) WITH (
> > | 'connector.type' = 'kafka',
> > | 'connector.version' = 'universal',
> > | -- 'connector.topic' = 'user',
> > | 'connector.topic' = 'user_long',
> > | 'connector.startup-mode' = 'latest-offset',
> > | 'connector.properties.group.id' = 'user_flink',
> > | 'format.type' = 'json',
> > | 'format.derive-schema' = 'true'
> > |)
> > |""".stripMargin)
> >
> >
> >
> >
> >
> >
> > streamTableEnv.executeSql(
> > """
> > |
> > |CREATE TABLE user_hbase3(
> > | rowkey BIGINT,
> > | cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
> > |) WITH (
> > | 'connector.type' = 'hbase',
> > | 'connector.version' = '2.1.0',
> > | 'connector.table-name' = 'user_hbase2',
> > | 'connector.zookeeper.znode.parent' = '/hbase',
> > | 'connector.write.buffer-flush.max-size' = '10mb',
> > | 'connector.write.buffer-flush.max-rows' = '1000',
> > | 'connector.write.buffer-flush.interval' = '2s'
> > |)
> > |""".stripMargin)
> >
> >
> > streamTableEnv.executeSql(
> > """
> > |
> > |insert into user_hbase3
> > |SELECT uid,
> > |
> > | ROW(sex, age, created_time ) as cf
> > | FROM (select uid,sex,age, cast(created_time as VARCHAR) as
> created_time from `user`)
> > |
> > |""".stripMargin)
> >
> >
> >
> >
> >
> >
> >
> >
>
--
Best, Jingsong Lee
Re:回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Posted by Zhou Zach <wa...@163.com>.
代码结构改成这样的了:
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
streamExecutionEnv.execute("from kafka sink hbase")
还是报一样的错
在 2020-07-08 15:40:41,"夏帅" <jk...@dingtalk.com.INVALID> 写道:
>你好,
>可以看看你的代码结构是不是以下这种
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
> val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> ......
> tableEnv.execute("")
>如果是的话,可以尝试使用bsEnv.execute("")
>1.11对于两者的execute代码实现有改动
>
>
>------------------------------------------------------------------
>发件人:Zhou Zach <wa...@163.com>
>发送时间:2020年7月8日(星期三) 15:30
>收件人:Flink user-zh mailing list <us...@flink.apache.org>
>主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology
>
>代码在flink 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
>Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.
>at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
>at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
>at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
>at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
>at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
>
>
>但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
>
>
>
>
>query:
>streamTableEnv.executeSql(
> """
> |
> |CREATE TABLE `user` (
> | uid BIGINT,
> | sex VARCHAR,
> | age INT,
> | created_time TIMESTAMP(3),
> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
> |) WITH (
> | 'connector.type' = 'kafka',
> | 'connector.version' = 'universal',
> | -- 'connector.topic' = 'user',
> | 'connector.topic' = 'user_long',
> | 'connector.startup-mode' = 'latest-offset',
> | 'connector.properties.group.id' = 'user_flink',
> | 'format.type' = 'json',
> | 'format.derive-schema' = 'true'
> |)
> |""".stripMargin)
>
>
>
>
>
>
> streamTableEnv.executeSql(
> """
> |
> |CREATE TABLE user_hbase3(
> | rowkey BIGINT,
> | cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
> |) WITH (
> | 'connector.type' = 'hbase',
> | 'connector.version' = '2.1.0',
> | 'connector.table-name' = 'user_hbase2',
> | 'connector.zookeeper.znode.parent' = '/hbase',
> | 'connector.write.buffer-flush.max-size' = '10mb',
> | 'connector.write.buffer-flush.max-rows' = '1000',
> | 'connector.write.buffer-flush.interval' = '2s'
> |)
> |""".stripMargin)
>
>
> streamTableEnv.executeSql(
> """
> |
> |insert into user_hbase3
> |SELECT uid,
> |
> | ROW(sex, age, created_time ) as cf
> | FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from `user`)
> |
> |""".stripMargin)
>
>
>
>
>
>
>
>