You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Andrew Chan (Jira)" <ji...@apache.org> on 2022/07/15 13:00:02 UTC
[jira] [Created] (FLINK-28572) FlinkSQL executes Table.execute multiple times on the same Table, and changing the Table.execute code position will produce different phenomena
Andrew Chan created FLINK-28572:
-----------------------------------
Summary: FlinkSQL executes Table.execute multiple times on the same Table, and changing the Table.execute code position will produce different phenomena
Key: FLINK-28572
URL: https://issues.apache.org/jira/browse/FLINK-28572
Project: Flink
Issue Type: Bug
Components: Table SQL / API, Table SQL / Planner
Affects Versions: 1.13.6
Environment: flink-table-planner-blink_2.11
1.13.6
Reporter: Andrew Chan
*The following code prints and inserts fine*
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("create table sensor(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's1', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'properties.group.id' = 'atguigu', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'format' = 'csv'" +
")");
Table result = tEnv.sqlQuery("select * from sensor");
tEnv.executeSql("create table s_out(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's2', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'format' = 'json', " +
" 'sink.partitioner' = 'round-robin' " +
")");
result.executeInsert("s_out");
result.execute().print();
}
*When the code that prints this line is moved up, it can be printed normally, but the insert statement is invalid, as follows*
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 1. 通过ddl方式建表(动态表), 与文件关联
tEnv.executeSql("create table sensor(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's1', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'properties.group.id' = 'atguigu', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'format' = 'csv'" +
")");
Table result = tEnv.sqlQuery("select * from sensor");
tEnv.executeSql("create table s_out(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's2', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'format' = 'json', " +
" 'sink.partitioner' = 'round-robin' " +
")");
{color:#FF0000}result.execute().print();{color}
{color:#FF0000} result.executeInsert("s_out");{color}
}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)