You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2020/07/23 01:34:00 UTC

[jira] [Closed] (FLINK-18145) Segment optimization does not work in blink ?

     [ https://issues.apache.org/jira/browse/FLINK-18145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

godfrey he closed FLINK-18145.
------------------------------
    Resolution: Invalid

> Segment optimization does not work in blink ?
> ---------------------------------------------
>
>                 Key: FLINK-18145
>                 URL: https://issues.apache.org/jira/browse/FLINK-18145
>             Project: Flink
>          Issue Type: Wish
>          Components: Table SQL / Planner
>            Reporter: hehuiyuan
>            Priority: Minor
>         Attachments: image-2020-06-05-14-56-01-710.png, image-2020-06-05-14-56-48-625.png, image-2020-06-05-14-57-11-287.png, image-2020-06-09-14-58-44-221.png
>
>
> DAG Segement Optimization: 
>  
> !image-2020-06-05-14-56-01-710.png|width=762,height=264!
> Code:
> {code:java}
>   StreamExecutionEnvironment env = EnvUtil.getEnv();
> env.setParallelism(1);
>   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>   EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>   StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);
>   GeneratorTableSource tableSource = new GeneratorTableSource(2, 1, 70, 0);
>   tableEnv.registerTableSource("myTble",tableSource);
>   Table mytable = tableEnv.scan("myTble");
>   mytable.printSchema();
>   tableEnv.toAppendStream(mytable,Row.class).addSink(new PrintSinkFunction<>()).setParallelism(2);
>   Table tableproc = tableEnv.sqlQuery("SELECT key, count(rowtime_string) as countkey,TUMBLE_START(proctime, INTERVAL '30' SECOND) as tumblestart FROM myTble group by TUMBLE(proctime, INTERVAL '30' SECOND) ,key");
>   tableproc.printSchema();
>   tableEnv.registerTable("t4",tableproc);
>   Table table = tableEnv.sqlQuery("SELECT key,count(rowtime_string) as countkey,TUMBLE_START(proctime,  INTERVAL '24' HOUR) as tumblestart FROM myTble group by TUMBLE(proctime,  INTERVAL '24' HOUR) ,key");
>   table.printSchema();
>   tableEnv.registerTable("t3",table);
>   String[] fields = new String[]{"key","countkey","tumblestart"};
>  TypeInformation[] fieldsType = new TypeInformation[3];
> fieldsType[0] = Types.INT;
> fieldsType[1] = Types.LONG;
>   fieldsType[2] = Types.SQL_TIMESTAMP;
>   PrintTableUpsertSink printTableSink = new PrintTableUpsertSink(fields,fieldsType,true);
> tableEnv.registerTableSink("inserttable",printTableSink);
> tableEnv.sqlUpdate("insert into inserttable  select key,countkey,tumblestart from t3");
>   String[] fieldsproc = new String[]{"key","countkey","tumblestart"};
>   TypeInformation[] fieldsTypeproc = new TypeInformation[3];
>   fieldsTypeproc[0] = Types.INT;
>   fieldsTypeproc[1] = Types.LONG;
>   fieldsTypeproc[2] = Types.SQL_TIMESTAMP;
>   PrintTableUpsertSink printTableSinkproc = new PrintTableUpsertSink(fieldsproc,fieldsTypeproc,true);
>   tableEnv.registerTableSink("inserttableproc",printTableSinkproc);
>   tableEnv.sqlUpdate("insert into inserttableproc  select key,countkey,tumblestart from t4");
> {code}
> I have a custom  table source , then
>     (1) transform datastream to use `toAppendStream` method   , then  sink
>     (2) use tumble ,then sink
>     (3) use another tumbel ,then sink
> but the segement optimization did't work.
>  
> !image-2020-06-05-14-57-11-287.png|width=546,height=388!  
>  
> *The source is executed by 3 threads  and generate duplicate data for 3 times*
>  
> !image-2020-06-05-14-56-48-625.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)