You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2020/07/23 01:34:00 UTC
[jira] [Closed] (FLINK-18145) Segment optimization does not work in
blink ?
[ https://issues.apache.org/jira/browse/FLINK-18145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he closed FLINK-18145.
------------------------------
Resolution: Invalid
> Segment optimization does not work in blink ?
> ---------------------------------------------
>
> Key: FLINK-18145
> URL: https://issues.apache.org/jira/browse/FLINK-18145
> Project: Flink
> Issue Type: Wish
> Components: Table SQL / Planner
> Reporter: hehuiyuan
> Priority: Minor
> Attachments: image-2020-06-05-14-56-01-710.png, image-2020-06-05-14-56-48-625.png, image-2020-06-05-14-57-11-287.png, image-2020-06-09-14-58-44-221.png
>
>
> DAG Segement Optimization:
>
> !image-2020-06-05-14-56-01-710.png|width=762,height=264!
> Code:
> {code:java}
> StreamExecutionEnvironment env = EnvUtil.getEnv();
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);
> GeneratorTableSource tableSource = new GeneratorTableSource(2, 1, 70, 0);
> tableEnv.registerTableSource("myTble",tableSource);
> Table mytable = tableEnv.scan("myTble");
> mytable.printSchema();
> tableEnv.toAppendStream(mytable,Row.class).addSink(new PrintSinkFunction<>()).setParallelism(2);
> Table tableproc = tableEnv.sqlQuery("SELECT key, count(rowtime_string) as countkey,TUMBLE_START(proctime, INTERVAL '30' SECOND) as tumblestart FROM myTble group by TUMBLE(proctime, INTERVAL '30' SECOND) ,key");
> tableproc.printSchema();
> tableEnv.registerTable("t4",tableproc);
> Table table = tableEnv.sqlQuery("SELECT key,count(rowtime_string) as countkey,TUMBLE_START(proctime, INTERVAL '24' HOUR) as tumblestart FROM myTble group by TUMBLE(proctime, INTERVAL '24' HOUR) ,key");
> table.printSchema();
> tableEnv.registerTable("t3",table);
> String[] fields = new String[]{"key","countkey","tumblestart"};
> TypeInformation[] fieldsType = new TypeInformation[3];
> fieldsType[0] = Types.INT;
> fieldsType[1] = Types.LONG;
> fieldsType[2] = Types.SQL_TIMESTAMP;
> PrintTableUpsertSink printTableSink = new PrintTableUpsertSink(fields,fieldsType,true);
> tableEnv.registerTableSink("inserttable",printTableSink);
> tableEnv.sqlUpdate("insert into inserttable select key,countkey,tumblestart from t3");
> String[] fieldsproc = new String[]{"key","countkey","tumblestart"};
> TypeInformation[] fieldsTypeproc = new TypeInformation[3];
> fieldsTypeproc[0] = Types.INT;
> fieldsTypeproc[1] = Types.LONG;
> fieldsTypeproc[2] = Types.SQL_TIMESTAMP;
> PrintTableUpsertSink printTableSinkproc = new PrintTableUpsertSink(fieldsproc,fieldsTypeproc,true);
> tableEnv.registerTableSink("inserttableproc",printTableSinkproc);
> tableEnv.sqlUpdate("insert into inserttableproc select key,countkey,tumblestart from t4");
> {code}
> I have a custom table source , then
> (1) transform datastream to use `toAppendStream` method , then sink
> (2) use tumble ,then sink
> (3) use another tumbel ,then sink
> but the segement optimization did't work.
>
> !image-2020-06-05-14-57-11-287.png|width=546,height=388!
>
> *The source is executed by 3 threads and generate duplicate data for 3 times*
>
> !image-2020-06-05-14-56-48-625.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)