You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (JIRA)" <ji...@apache.org> on 2017/02/21 02:45:44 UTC
[jira] [Updated] (FLINK-5858) Support multiple sinks in same
execution DAG
[ https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he updated FLINK-5858:
------------------------------
Description:
When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example:
{code:title=Example.scala|borderStyle=solid}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val csvTableSource = new CsvTableSource(
"/tmp/words",
Array("first", "id", "score", "last"),
Array(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
),
fieldDelim = "#"
)
tEnv.registerTableSource("csv_source", csvTableSource)
val resultTable = tEnv.scan("csv_source")
.groupBy('first)
.select('first, 'score.sum)
resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))
println(tEnv.explain(resultTable))
{code}
result:
== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
LogicalProject(first=[$0], score=[$2])
LogicalTableScan(table=[[csv_source]])
== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
BatchTableSourceScan(table=[[csv_source]], fields=[first, score])
== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 5 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 4 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 3 : GroupReduce
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 2 : Map
content : to: Row(f0: String, f1: Double)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 1 : Map
content : Map at emitDataSet(CsvTableSink.scala:67)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 0 : Data Sink
content : TextOutputFormat (/tmp/wordcount1) - UTF-8
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
{color:red}
Stage 13 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 12 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 11 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 10 : GroupReduce
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 9 : Map
content : to: Row(f0: String, f1: Double)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 8 : Map
content : Map at emitDataSet(CsvTableSink.scala:67)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 7 : Data Sink
content : TextOutputFormat (/tmp/wordcount2) - UTF-8
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
{color:red}
Stage 18 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 17 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 16 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 15 : GroupReduce
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 14 : Data Sink
content : org.apache.flink.api.java.io.DiscardingOutputFormat
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
was:
When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example:
{code:title=Example.scala|borderStyle=solid}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val csvTableSource = new CsvTableSource(
"/tmp/words",
Array("first", "id", "score", "last"),
Array(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
),
fieldDelim = "#"
)
tEnv.registerTableSource("csv_source", csvTableSource)
val resultTable = tEnv.scan("csv_source")
.groupBy('first)
.select('first, 'score.sum)
resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))
println(tEnv.explain(resultTable))
{code}
result:
== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
LogicalProject(first=[$0], score=[$2])
LogicalTableScan(table=[[csv_source]])
== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
BatchTableSourceScan(table=[[csv_source]], fields=[first, score])
== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 5 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 4 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 3 : GroupReduce
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 2 : Map
content : to: Row(f0: String, f1: Double)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 1 : Map
content : Map at emitDataSet(CsvTableSink.scala:67)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 0 : Data Sink
content : TextOutputFormat (/tmp/wordcount1) - UTF-8
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
{color:red}
Stage 13 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 12 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 11 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 10 : GroupReduce
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 9 : Map
content : to: Row(f0: String, f1: Double)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 8 : Map
content : Map at emitDataSet(CsvTableSink.scala:67)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 7 : Data Sink
content : TextOutputFormat (/tmp/wordcount2) - UTF-8
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
{color:red}
Stage 18 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 17 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 16 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 15 : GroupReduce
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 14 : Data Sink
content : org.apache.flink.api.java.io.DiscardingOutputFormat
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
> Support multiple sinks in same execution DAG
> --------------------------------------------
>
> Key: FLINK-5858
> URL: https://issues.apache.org/jira/browse/FLINK-5858
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: godfrey he
>
> When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example:
> {code:title=Example.scala|borderStyle=solid}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val csvTableSource = new CsvTableSource(
> "/tmp/words",
> Array("first", "id", "score", "last"),
> Array(
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.DOUBLE_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO
> ),
> fieldDelim = "#"
> )
> tEnv.registerTableSource("csv_source", csvTableSource)
> val resultTable = tEnv.scan("csv_source")
> .groupBy('first)
> .select('first, 'score.sum)
> resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
> resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))
> println(tEnv.explain(resultTable))
> {code}
> result:
> == Abstract Syntax Tree ==
> LogicalProject(first=[$0], TMP_1=[$1])
> LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
> LogicalProject(first=[$0], score=[$2])
> LogicalTableScan(table=[[csv_source]])
> == Optimized Logical Plan ==
> DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
> BatchTableSourceScan(table=[[csv_source]], fields=[first, score])
> == Physical Execution Plan ==
> {color:red}
> Stage 6 : Data Source
> {color}
> content : collect elements with CollectionInputFormat
> Partitioning : RANDOM_PARTITIONED
> Stage 5 : Map
> content : prepare select: (first, SUM(score) AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 4 : GroupCombine
> content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Sorted Combine
> Partitioning : RANDOM_PARTITIONED
> Stage 3 : GroupReduce
> content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> ship_strategy : Hash Partition on [0]
> exchange_mode : PIPELINED
> driver_strategy : Sorted Group Reduce
> Partitioning : RANDOM_PARTITIONED
> Stage 2 : Map
> content : to: Row(f0: String, f1: Double)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 1 : Map
> content : Map at emitDataSet(CsvTableSink.scala:67)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 0 : Data Sink
> content : TextOutputFormat (/tmp/wordcount1) - UTF-8
> ship_strategy : Forward
> exchange_mode : PIPELINED
> Partitioning : RANDOM_PARTITIONED
> {color:red}
> Stage 13 : Data Source
> {color}
> content : collect elements with CollectionInputFormat
> Partitioning : RANDOM_PARTITIONED
> Stage 12 : Map
> content : prepare select: (first, SUM(score) AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 11 : GroupCombine
> content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Sorted Combine
> Partitioning : RANDOM_PARTITIONED
> Stage 10 : GroupReduce
> content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> ship_strategy : Hash Partition on [0]
> exchange_mode : PIPELINED
> driver_strategy : Sorted Group Reduce
> Partitioning : RANDOM_PARTITIONED
> Stage 9 : Map
> content : to: Row(f0: String, f1: Double)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 8 : Map
> content : Map at emitDataSet(CsvTableSink.scala:67)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 7 : Data Sink
> content : TextOutputFormat (/tmp/wordcount2) - UTF-8
> ship_strategy : Forward
> exchange_mode : PIPELINED
> Partitioning : RANDOM_PARTITIONED
> {color:red}
> Stage 18 : Data Source
> {color}
> content : collect elements with CollectionInputFormat
> Partitioning : RANDOM_PARTITIONED
> Stage 17 : Map
> content : prepare select: (first, SUM(score) AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 16 : GroupCombine
> content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Sorted Combine
> Partitioning : RANDOM_PARTITIONED
> Stage 15 : GroupReduce
> content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> ship_strategy : Hash Partition on [0]
> exchange_mode : PIPELINED
> driver_strategy : Sorted Group Reduce
> Partitioning : RANDOM_PARTITIONED
> Stage 14 : Data Sink
> content : org.apache.flink.api.java.io.DiscardingOutputFormat
> ship_strategy : Forward
> exchange_mode : PIPELINED
> Partitioning : RANDOM_PARTITIONED
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)