You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "hehuiyuan (Jira)" <ji...@apache.org> on 2020/06/05 06:56:00 UTC

[jira] [Created] (FLINK-18145) Segment optimization does not work ?

hehuiyuan created FLINK-18145:
---------------------------------

             Summary: Segment optimization does not work ?
                 Key: FLINK-18145
                 URL: https://issues.apache.org/jira/browse/FLINK-18145
             Project: Flink
          Issue Type: Wish
            Reporter: hehuiyuan


DAG Segement Optimization: !image-2020-06-05-14-40-03-123.png|width=569,height=226!

Code:
{code:java}
  StreamExecutionEnvironment env = EnvUtil.getEnv();
env.setParallelism(1);
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);

  GeneratorTableSource tableSource = new GeneratorTableSource(2, 1, 70, 0);
  tableEnv.registerTableSource("myTble",tableSource);
  Table mytable = tableEnv.scan("myTble");
  mytable.printSchema();
  tableEnv.toAppendStream(mytable,Row.class).addSink(new PrintSinkFunction<>()).setParallelism(2);

  Table tableproc = tableEnv.sqlQuery("SELECT key, count(rowtime_string) as countkey,TUMBLE_START(proctime, INTERVAL '30' SECOND) as tumblestart FROM myTble group by TUMBLE(proctime, INTERVAL '30' SECOND) ,key");
  tableproc.printSchema();
  tableEnv.registerTable("t4",tableproc);


  Table table = tableEnv.sqlQuery("SELECT key,count(rowtime_string) as countkey,TUMBLE_START(proctime,  INTERVAL '24' HOUR) as tumblestart FROM myTble group by TUMBLE(proctime,  INTERVAL '24' HOUR) ,key");
  table.printSchema();
  tableEnv.registerTable("t3",table);

  String[] fields = new String[]{"key","countkey","tumblestart"};
 TypeInformation[] fieldsType = new TypeInformation[3];
fieldsType[0] = Types.INT;
fieldsType[1] = Types.LONG;
  fieldsType[2] = Types.SQL_TIMESTAMP;
  PrintTableUpsertSink printTableSink = new PrintTableUpsertSink(fields,fieldsType,true);
tableEnv.registerTableSink("inserttable",printTableSink);
tableEnv.sqlUpdate("insert into inserttable  select key,countkey,tumblestart from t3");


  String[] fieldsproc = new String[]{"key","countkey","tumblestart"};
  TypeInformation[] fieldsTypeproc = new TypeInformation[3];
  fieldsTypeproc[0] = Types.INT;
  fieldsTypeproc[1] = Types.LONG;
  fieldsTypeproc[2] = Types.SQL_TIMESTAMP;
  PrintTableUpsertSink printTableSinkproc = new PrintTableUpsertSink(fieldsproc,fieldsTypeproc,true);
  tableEnv.registerTableSink("inserttableproc",printTableSinkproc);
  tableEnv.sqlUpdate("insert into inserttableproc  select key,countkey,tumblestart from t4");

{code}
I have a custom  table source , then

    (1) transform datastream to use `toAppendStream` method   , then  sink

    (2) use tumble ,then sink

    (3) use another tumbel ,then sink

but the segement optimization did't work.

*!image-2020-06-05-14-50-33-759.png|width=458,height=336!*

 

*The source is executed by 3 threads  and generate duplicate data for 3 times*

!image-2020-06-05-14-53-57-056.png|width=1216,height=204!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)