You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "hehuiyuan (Jira)" <ji...@apache.org> on 2020/06/05 06:56:00 UTC
[jira] [Created] (FLINK-18145) Segment optimization does not work ?
hehuiyuan created FLINK-18145:
---------------------------------
Summary: Segment optimization does not work ?
Key: FLINK-18145
URL: https://issues.apache.org/jira/browse/FLINK-18145
Project: Flink
Issue Type: Wish
Reporter: hehuiyuan
DAG Segement Optimization: !image-2020-06-05-14-40-03-123.png|width=569,height=226!
Code:
{code:java}
StreamExecutionEnvironment env = EnvUtil.getEnv();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);
GeneratorTableSource tableSource = new GeneratorTableSource(2, 1, 70, 0);
tableEnv.registerTableSource("myTble",tableSource);
Table mytable = tableEnv.scan("myTble");
mytable.printSchema();
tableEnv.toAppendStream(mytable,Row.class).addSink(new PrintSinkFunction<>()).setParallelism(2);
Table tableproc = tableEnv.sqlQuery("SELECT key, count(rowtime_string) as countkey,TUMBLE_START(proctime, INTERVAL '30' SECOND) as tumblestart FROM myTble group by TUMBLE(proctime, INTERVAL '30' SECOND) ,key");
tableproc.printSchema();
tableEnv.registerTable("t4",tableproc);
Table table = tableEnv.sqlQuery("SELECT key,count(rowtime_string) as countkey,TUMBLE_START(proctime, INTERVAL '24' HOUR) as tumblestart FROM myTble group by TUMBLE(proctime, INTERVAL '24' HOUR) ,key");
table.printSchema();
tableEnv.registerTable("t3",table);
String[] fields = new String[]{"key","countkey","tumblestart"};
TypeInformation[] fieldsType = new TypeInformation[3];
fieldsType[0] = Types.INT;
fieldsType[1] = Types.LONG;
fieldsType[2] = Types.SQL_TIMESTAMP;
PrintTableUpsertSink printTableSink = new PrintTableUpsertSink(fields,fieldsType,true);
tableEnv.registerTableSink("inserttable",printTableSink);
tableEnv.sqlUpdate("insert into inserttable select key,countkey,tumblestart from t3");
String[] fieldsproc = new String[]{"key","countkey","tumblestart"};
TypeInformation[] fieldsTypeproc = new TypeInformation[3];
fieldsTypeproc[0] = Types.INT;
fieldsTypeproc[1] = Types.LONG;
fieldsTypeproc[2] = Types.SQL_TIMESTAMP;
PrintTableUpsertSink printTableSinkproc = new PrintTableUpsertSink(fieldsproc,fieldsTypeproc,true);
tableEnv.registerTableSink("inserttableproc",printTableSinkproc);
tableEnv.sqlUpdate("insert into inserttableproc select key,countkey,tumblestart from t4");
{code}
I have a custom table source , then
(1) transform datastream to use `toAppendStream` method , then sink
(2) use tumble ,then sink
(3) use another tumbel ,then sink
but the segement optimization did't work.
*!image-2020-06-05-14-50-33-759.png|width=458,height=336!*
*The source is executed by 3 threads and generate duplicate data for 3 times*
!image-2020-06-05-14-53-57-056.png|width=1216,height=204!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)