You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陶浩然 <ta...@hdec.com> on 2023/03/01 09:19:19 UTC

metrics.latency.interval指标应该如何查看?

我使用的flink版本是1.14.0,在flink-conf.yaml里添加了latency的配置
但是我在web-ui中没有找到这个指标

请问下是哪里出问题了。
flink任务是从kafka中读数据写入mysql中


public class FlinkSqlTask {
&nbsp; &nbsp; public static void main(String[] args) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("10.215.142.175", 7080);
&nbsp; &nbsp; &nbsp; &nbsp; env.getConfig().setParallelism(1);
&nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings streamSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
&nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tableEnvironment =StreamTableEnvironment.create(env, streamSettings);
&nbsp; &nbsp; &nbsp; &nbsp; tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
&nbsp; &nbsp; &nbsp; &nbsp; StatementSet stmtSet = tableEnvironment.createStatementSet();
&nbsp; &nbsp; &nbsp; &nbsp; String originSql = "create table class0 (id int,name string) with ('connector'='kafka','topic'='test.280.91.test.class','properties.bootstrap.servers'='10.215.142.175:9092','debezium-json.schema-include'='true','scan.startup.mode' = 'latest-offset','properties.group.id'='test.280.91-dbhistory','format'='debezium-json')";
&nbsp; &nbsp; &nbsp; &nbsp; String targetSql = "create table class_test (id int,name string,primary key(id) NOT ENFORCED) with ('connector'='jdbc','url'='jdbc:mysql://10.215.142.98:43306/test?allowMultiQueries=true&amp;useUnicode=true&amp;characterEncoding=UTF-8&amp;useSSL=false&amp;serverTimezone=GMT%2B8&amp;allowPublicKeyRetrieval=true','username'='root','password' ='ecidi@2019+Ecidi@2019','table-name'='class_test')";
&nbsp; &nbsp; &nbsp; &nbsp; String insertSql = "insert into class_test(id,name) select id,name from class0 where 1=1";
&nbsp; &nbsp; &nbsp; &nbsp; tableEnvironment.executeSql(originSql);
&nbsp; &nbsp; &nbsp; &nbsp; tableEnvironment.executeSql(targetSql);
&nbsp; &nbsp; &nbsp; &nbsp; stmtSet.addInsertSql(insertSql);
&nbsp; &nbsp; &nbsp; &nbsp; stmtSet.execute();
&nbsp; &nbsp; }
}