You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Andrew Chan (Jira)" <ji...@apache.org> on 2022/07/15 13:02:00 UTC

[jira] [Updated] (FLINK-28572) FlinkSQL executes Table.execute multiple times on the same Table, and changing the Table.execute code position will produce different phenomena

     [ https://issues.apache.org/jira/browse/FLINK-28572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Chan updated FLINK-28572:
--------------------------------
    Description: 
*The following code prints and inserts fine*

public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("create table sensor(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's1', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'properties.group.id' = 'atguigu', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'format' = 'csv'" +
")");
Table result = tEnv.sqlQuery("select * from sensor");

tEnv.executeSql("create table s_out(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's2', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'format' = 'json', " +
" 'sink.partitioner' = 'round-robin' " +
")");
{color:#FF0000}result.executeInsert("s_out");{color}
{color:#FF0000} result.execute().print();{color}

}

---

*When the code that prints this line is moved up, it can be printed normally, but the insert statement is invalid, as follows*

public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("create table sensor(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's1', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'properties.group.id' = 'atguigu', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'format' = 'csv'" +
")");

Table result = tEnv.sqlQuery("select * from sensor");

tEnv.executeSql("create table s_out(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's2', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'format' = 'json', " +
" 'sink.partitioner' = 'round-robin' " +
")");
{color:#ff0000}result.execute().print();{color}
{color:#ff0000} result.executeInsert("s_out");{color}
}

  was:
*The following code prints and inserts fine*

public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("create table sensor(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's1', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'properties.group.id' = 'atguigu', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'format' = 'csv'" +
")");
Table result = tEnv.sqlQuery("select * from sensor");
tEnv.executeSql("create table s_out(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's2', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'format' = 'json', " +
" 'sink.partitioner' = 'round-robin' " +
")");
result.executeInsert("s_out");
result.execute().print();
}

 

*When the code that prints this line is moved up, it can be printed normally, but the insert statement is invalid, as follows*

public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 1. 通过ddl方式建表(动态表), 与文件关联
tEnv.executeSql("create table sensor(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's1', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'properties.group.id' = 'atguigu', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'format' = 'csv'" +
")");


Table result = tEnv.sqlQuery("select * from sensor");

tEnv.executeSql("create table s_out(" +
" id string, " +
" ts bigint, " +
" vc int" +
")with(" +
" 'connector' = 'kafka', " +
" 'topic' = 's2', " +
" 'properties.bootstrap.servers' = 'hadoop162:9092', " +
" 'format' = 'json', " +
" 'sink.partitioner' = 'round-robin' " +
")");
{color:#FF0000}result.execute().print();{color}
{color:#FF0000} result.executeInsert("s_out");{color}
}


> FlinkSQL executes Table.execute multiple times on the same Table, and changing the Table.execute code position will produce different phenomena
> -----------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28572
>                 URL: https://issues.apache.org/jira/browse/FLINK-28572
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Planner
>    Affects Versions: 1.13.6
>         Environment: flink-table-planner-blink_2.11  
> 1.13.6
>            Reporter: Andrew Chan
>            Priority: Major
>
> *The following code prints and inserts fine*
> public static void main(String[] args) {
> Configuration conf = new Configuration();
> conf.setInteger("rest.port", 2000);
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
> env.setParallelism(1);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> tEnv.executeSql("create table sensor(" +
> " id string, " +
> " ts bigint, " +
> " vc int" +
> ")with(" +
> " 'connector' = 'kafka', " +
> " 'topic' = 's1', " +
> " 'properties.bootstrap.servers' = 'hadoop162:9092', " +
> " 'properties.group.id' = 'atguigu', " +
> " 'scan.startup.mode' = 'latest-offset', " +
> " 'format' = 'csv'" +
> ")");
> Table result = tEnv.sqlQuery("select * from sensor");
> tEnv.executeSql("create table s_out(" +
> " id string, " +
> " ts bigint, " +
> " vc int" +
> ")with(" +
> " 'connector' = 'kafka', " +
> " 'topic' = 's2', " +
> " 'properties.bootstrap.servers' = 'hadoop162:9092', " +
> " 'format' = 'json', " +
> " 'sink.partitioner' = 'round-robin' " +
> ")");
> {color:#FF0000}result.executeInsert("s_out");{color}
> {color:#FF0000} result.execute().print();{color}
> }
> ---
> *When the code that prints this line is moved up, it can be printed normally, but the insert statement is invalid, as follows*
> public static void main(String[] args) {
> Configuration conf = new Configuration();
> conf.setInteger("rest.port", 2000);
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
> env.setParallelism(1);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> tEnv.executeSql("create table sensor(" +
> " id string, " +
> " ts bigint, " +
> " vc int" +
> ")with(" +
> " 'connector' = 'kafka', " +
> " 'topic' = 's1', " +
> " 'properties.bootstrap.servers' = 'hadoop162:9092', " +
> " 'properties.group.id' = 'atguigu', " +
> " 'scan.startup.mode' = 'latest-offset', " +
> " 'format' = 'csv'" +
> ")");
> Table result = tEnv.sqlQuery("select * from sensor");
> tEnv.executeSql("create table s_out(" +
> " id string, " +
> " ts bigint, " +
> " vc int" +
> ")with(" +
> " 'connector' = 'kafka', " +
> " 'topic' = 's2', " +
> " 'properties.bootstrap.servers' = 'hadoop162:9092', " +
> " 'format' = 'json', " +
> " 'sink.partitioner' = 'round-robin' " +
> ")");
> {color:#ff0000}result.execute().print();{color}
> {color:#ff0000} result.executeInsert("s_out");{color}
> }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)