You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by me <me...@lishiyu.cn> on 2020/10/14 02:09:19 UTC

flink sql注册kafka映射表,读取数据实时写入hive报错No operators defined in streaming topology. Cannot generate StreamGraph.

版本:flink1.11
场景描述:flink sql注册kafka映射表,读取数据实时写入hive
报错:No operators defined in streaming topology. Cannot generate StreamGraph.


具体代码:
val flinkKafkaSqlSouce: String =
 s"""create table slog(
 |`f1` string,
 |`f2` string,
 |`f3` string,
 |`f4` string,
 |`f5` string,
 |collect_date string
 |) with (
 | 'connector' = 'kafka',
 | 'topic' = 'kafka_table',
 | 'properties.bootstrap.servers' = '${kafkaHost}',
 | 'properties.group.id' = 'DwdSecurityLogToHive',
 | 'format' = 'json',
 | 'scan.startup.mode' = 'earliest-offset'
 |)
 |""”.stripMargin
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT)
tableEnv.executeSql(flinkKafkaSqlSouce)
println(tableEnv.getCurrentCatalog +"."+tableEnv.getCurrentDatabase)
HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv)//我们自己写的工具类,就是注册hive catalog的
//tableEnv.registerTable("security_log", kafkaData)
//println(kafkaData.toString)
tableEnv.executeSql(s"insert into table tsgz.dwd_security_log select * from default_catalog.default_database.security_log" )