You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "godfrey he (JIRA)" <ji...@apache.org> on 2017/02/21 02:43:44 UTC
[jira] [Created] (FLINK-5858) Support multiple sinks in same
execution DAG
godfrey he created FLINK-5858:
---------------------------------
Summary: Support multiple sinks in same execution DAG
Key: FLINK-5858
URL: https://issues.apache.org/jira/browse/FLINK-5858
Project: Flink
Issue Type: Improvement
Components: Table API & SQL
Reporter: godfrey he
When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example:
{code:title=Example.scala|borderStyle=solid}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val csvTableSource = new CsvTableSource(
"/tmp/words",
Array("first", "id", "score", "last"),
Array(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
),
fieldDelim = "#"
)
tEnv.registerTableSource("csv_source", csvTableSource)
val resultTable = tEnv.scan("csv_source")
.groupBy('first)
.select('first, 'score.sum)
resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))
println(tEnv.explain(resultTable))
{code}
result:
== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
LogicalProject(first=[$0], score=[$2])
LogicalTableScan(table=[[csv_source]])
== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
BatchTableSourceScan(table=[[csv_source]], fields=[first, score])
== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 5 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 4 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 3 : GroupReduce
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 2 : Map
content : to: Row(f0: String, f1: Double)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 1 : Map
content : Map at emitDataSet(CsvTableSink.scala:67)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 0 : Data Sink
content : TextOutputFormat (/tmp/wordcount1) - UTF-8
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
{color:red}
Stage 13 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 12 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 11 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 10 : GroupReduce
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 9 : Map
content : to: Row(f0: String, f1: Double)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 8 : Map
content : Map at emitDataSet(CsvTableSink.scala:67)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 7 : Data Sink
content : TextOutputFormat (/tmp/wordcount2) - UTF-8
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
{color:red}
Stage 18 : Data Source
{color}
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 17 : Map
content : prepare select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 16 : GroupCombine
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 15 : GroupReduce
content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 14 : Data Sink
content : org.apache.flink.api.java.io.DiscardingOutputFormat
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)