You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Wu (Jira)" <ji...@apache.org> on 2020/12/17 06:14:00 UTC

[jira] [Created] (FLINK-20637) Table convert to dataStream twice will result in two data streams

Wu created FLINK-20637:
--------------------------

             Summary: Table convert to dataStream twice will result in two data streams
                 Key: FLINK-20637
                 URL: https://issues.apache.org/jira/browse/FLINK-20637
             Project: Flink
          Issue Type: Improvement
          Components: API / DataStream, Connectors / Kafka, Table SQL / API, Table SQL / Planner
    Affects Versions: 1.11.2
            Reporter: Wu


 

Code
{code:java}
//代码占位符
    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.enableCheckpointing(50000);
    env.setParallelism(10);    

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
   
    tableEnv.executeSql("create table feeds_expose_click_profile ( docId string ,buuid string ,predictId string ,docType int ,clickLabel int ,viewTime int ,exposeEventTime bigint ,clickEventTime string ,authorId string ,category string ,subCategory string ,keywords string ,tags  string, eventTime bigint,  rowTime as TO_TIMESTAMP(from_unixtime(eventTime / 1000)), WATERMARK FOR rowTime AS rowTime - INTERVAL '5' SECOND) WITH ('connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'false' )");
    Table table = tableEnv.from("feeds_expose_click_profile");
    TypeInformation<Row> typeInfo = table.getSchema().toRowType();    DataStream dataStream = tableEnv .toRetractStream(table, typeInfo)
        .filter(row -> row.f0)
        .map(row -> row.f1)
        .returns(typeInfo);    
    Table tableFilter = tableEnv.sqlQuery("select buuid, authorId, viewTime, rowTime from feeds_expose_click_profile");
    tableEnv.createTemporaryView("tableFilter", tableFilter);    TypeInformation<Row> typeInfo1 = tableFilter.getSchema().toRowType();
    DataStream dataStream1 = tableEnv .toRetractStream(tableFilter, typeInfo1)
        .filter(row -> row.f0)
        .map(row -> row.f1)
        .returns(typeInfo1);    dataStream1.print();
    System.out.println(env.getExecutionPlan());
{code}
 

 

ExecutionPlan

 
{code:java}
//代码占位符

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
    "pact" : "Data Source",
    "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
    "parallelism" : 10
  }, {
    "id" : 2,
    "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
    "pact" : "Operator",
    "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 3,
    "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
    "pact" : "Operator",
    "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "SinkConversionToTuple2",
    "pact" : "Operator",
    "contents" : "SinkConversionToTuple2",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 3,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "Filter",
    "pact" : "Operator",
    "contents" : "Filter",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 6,
    "type" : "Map",
    "pact" : "Operator",
    "contents" : "Map",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 5,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 7,
    "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
    "pact" : "Data Source",
    "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
    "parallelism" : 10
  }, {
    "id" : 8,
    "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
    "pact" : "Operator",
    "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 7,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 9,
    "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
    "pact" : "Operator",
    "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 8,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 10,
    "type" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
    "pact" : "Operator",
    "contents" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 9,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 11,
    "type" : "SinkConversionToTuple2",
    "pact" : "Operator",
    "contents" : "SinkConversionToTuple2",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 10,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 12,
    "type" : "Filter",
    "pact" : "Operator",
    "contents" : "Filter",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 11,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 13,
    "type" : "Map",
    "pact" : "Operator",
    "contents" : "Map",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 12,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 14,
    "type" : "Sink: Print to Std. Out",
    "pact" : "Data Sink",
    "contents" : "Sink: Print to Std. Out",
    "parallelism" : 10,
    "predecessors" : [ {
      "id" : 13,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
{code}
 

I encountered this problem while using waterdrop. How to fix this problem.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)