You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Wu (Jira)" <ji...@apache.org> on 2020/12/17 06:14:00 UTC
[jira] [Created] (FLINK-20637) Table convert to dataStream twice
will result in two data streams
Wu created FLINK-20637:
--------------------------
Summary: Table convert to dataStream twice will result in two data streams
Key: FLINK-20637
URL: https://issues.apache.org/jira/browse/FLINK-20637
Project: Flink
Issue Type: Improvement
Components: API / DataStream, Connectors / Kafka, Table SQL / API, Table SQL / Planner
Affects Versions: 1.11.2
Reporter: Wu
Code
{code:java}
//代码占位符
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(50000);
env.setParallelism(10);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.executeSql("create table feeds_expose_click_profile ( docId string ,buuid string ,predictId string ,docType int ,clickLabel int ,viewTime int ,exposeEventTime bigint ,clickEventTime string ,authorId string ,category string ,subCategory string ,keywords string ,tags string, eventTime bigint, rowTime as TO_TIMESTAMP(from_unixtime(eventTime / 1000)), WATERMARK FOR rowTime AS rowTime - INTERVAL '5' SECOND) WITH ('connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'false' )");
Table table = tableEnv.from("feeds_expose_click_profile");
TypeInformation<Row> typeInfo = table.getSchema().toRowType(); DataStream dataStream = tableEnv .toRetractStream(table, typeInfo)
.filter(row -> row.f0)
.map(row -> row.f1)
.returns(typeInfo);
Table tableFilter = tableEnv.sqlQuery("select buuid, authorId, viewTime, rowTime from feeds_expose_click_profile");
tableEnv.createTemporaryView("tableFilter", tableFilter); TypeInformation<Row> typeInfo1 = tableFilter.getSchema().toRowType();
DataStream dataStream1 = tableEnv .toRetractStream(tableFilter, typeInfo1)
.filter(row -> row.f0)
.map(row -> row.f1)
.returns(typeInfo1); dataStream1.print();
System.out.println(env.getExecutionPlan());
{code}
ExecutionPlan
{code:java}
//代码占位符
{
"nodes" : [ {
"id" : 1,
"type" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
"pact" : "Data Source",
"contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
"parallelism" : 10
}, {
"id" : 2,
"type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
"pact" : "Operator",
"contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
"parallelism" : 10,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 3,
"type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
"pact" : "Operator",
"contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
"parallelism" : 10,
"predecessors" : [ {
"id" : 2,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "SinkConversionToTuple2",
"pact" : "Operator",
"contents" : "SinkConversionToTuple2",
"parallelism" : 10,
"predecessors" : [ {
"id" : 3,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 5,
"type" : "Filter",
"pact" : "Operator",
"contents" : "Filter",
"parallelism" : 10,
"predecessors" : [ {
"id" : 4,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 6,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 10,
"predecessors" : [ {
"id" : 5,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 7,
"type" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
"pact" : "Data Source",
"contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])",
"parallelism" : 10
}, {
"id" : 8,
"type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
"pact" : "Operator",
"contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])",
"parallelism" : 10,
"predecessors" : [ {
"id" : 7,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 9,
"type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
"pact" : "Operator",
"contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])",
"parallelism" : 10,
"predecessors" : [ {
"id" : 8,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 10,
"type" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
"pact" : "Operator",
"contents" : "Calc(select=[buuid, authorId, viewTime, rowTime])",
"parallelism" : 10,
"predecessors" : [ {
"id" : 9,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 11,
"type" : "SinkConversionToTuple2",
"pact" : "Operator",
"contents" : "SinkConversionToTuple2",
"parallelism" : 10,
"predecessors" : [ {
"id" : 10,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 12,
"type" : "Filter",
"pact" : "Operator",
"contents" : "Filter",
"parallelism" : 10,
"predecessors" : [ {
"id" : 11,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 13,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 10,
"predecessors" : [ {
"id" : 12,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 14,
"type" : "Sink: Print to Std. Out",
"pact" : "Data Sink",
"contents" : "Sink: Print to Std. Out",
"parallelism" : 10,
"predecessors" : [ {
"id" : 13,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
{code}
I encountered this problem while using waterdrop. How to fix this problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)