You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (Jira)" <ji...@apache.org> on 2020/12/17 07:59:00 UTC

[jira] [Updated] (FLINK-20637) Table convert to dataStream twice will result in two data streams

     [ https://issues.apache.org/jira/browse/FLINK-20637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timo Walther updated FLINK-20637:
---------------------------------
        Parent: FLINK-19976
    Issue Type: Sub-task  (was: Improvement)

> Table convert to dataStream twice will result in two data streams
> -----------------------------------------------------------------
>
>                 Key: FLINK-20637
>                 URL: https://issues.apache.org/jira/browse/FLINK-20637
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream, Connectors / Kafka, Table SQL / API, Table SQL / Planner
>    Affects Versions: 1.11.2
>            Reporter: Wu
>            Priority: Major
>
>  
> Code
> {code:java}
> //代码占位符
>     EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     env.enableCheckpointing(50000);
>     env.setParallelism(10);    
>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
>    
>     tableEnv.executeSql("create table feeds_expose_click_profile ( docId string ,buuid string ,predictId string ,docType int ,clickLabel int ,viewTime int ,exposeEventTime bigint ,clickEventTime string ,authorId string ,category string ,subCategory string ,keywords string ,tags  string, eventTime bigint,  rowTime as TO_TIMESTAMP(from_unixtime(eventTime / 1000)), WATERMARK FOR rowTime AS rowTime - INTERVAL '5' SECOND) WITH ('connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'false' )");
>     Table table = tableEnv.from("feeds_expose_click_profile");
>     TypeInformation<Row> typeInfo = table.getSchema().toRowType();    DataStream dataStream = tableEnv .toRetractStream(table, typeInfo)
>         .filter(row -> row.f0)
>         .map(row -> row.f1)
>         .returns(typeInfo);    
>     Table tableFilter = tableEnv.sqlQuery("select buuid, authorId, viewTime, rowTime from feeds_expose_click_profile");
>     tableEnv.createTemporaryView("tableFilter", tableFilter);    TypeInformation<Row> typeInfo1 = tableFilter.getSchema().toRowType();
>     DataStream dataStream1 = tableEnv .toRetractStream(tableFilter, typeInfo1)
>         .filter(row -> row.f0)
>         .map(row -> row.f1)
>         .returns(typeInfo1);    dataStream1.print();
>     System.out.println(env.getExecutionPlan());
> {code}
>  
>  
> ExecutionPlan
>  
> {code:java}
> //代码占位符
> {
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
>     "pact" : "Data Source",
>     "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
>     "parallelism" : 10
>   }, {
>     "id" : 2,
>     "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
>     "pact" : "Operator",
>     "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 1,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 3,
>     "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
>     "pact" : "Operator",
>     "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 2,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 4,
>     "type" : "SinkConversionToTuple2",
>     "pact" : "Operator",
>     "contents" : "SinkConversionToTuple2",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 3,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 5,
>     "type" : "Filter",
>     "pact" : "Operator",
>     "contents" : "Filter",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 4,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 6,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 5,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 7,
>     "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
>     "pact" : "Data Source",
>     "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
>     "parallelism" : 10
>   }, {
>     "id" : 8,
>     "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
>     "pact" : "Operator",
>     "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 7,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 9,
>     "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
>     "pact" : "Operator",
>     "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 8,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 10,
>     "type" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
>     "pact" : "Operator",
>     "contents" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 9,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 11,
>     "type" : "SinkConversionToTuple2",
>     "pact" : "Operator",
>     "contents" : "SinkConversionToTuple2",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 10,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 12,
>     "type" : "Filter",
>     "pact" : "Operator",
>     "contents" : "Filter",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 11,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 13,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 12,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 14,
>     "type" : "Sink: Print to Std. Out",
>     "pact" : "Data Sink",
>     "contents" : "Sink: Print to Std. Out",
>     "parallelism" : 10,
>     "predecessors" : [ {
>       "id" : 13,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   } ]
> }
> {code}
>  
> I encountered this problem while using waterdrop. How to fix this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)