You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kurt Young (JIRA)" <ji...@apache.org> on 2019/01/30 10:25:00 UTC

[jira] [Closed] (FLINK-5858) Support multiple sinks in same execution DAG

     [ https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kurt Young closed FLINK-5858.
-----------------------------
    Resolution: Unresolved

> Support multiple sinks in same execution DAG
> --------------------------------------------
>
>                 Key: FLINK-5858
>                 URL: https://issues.apache.org/jira/browse/FLINK-5858
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API &amp; SQL
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>
> When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example:
> {code:title=Example.scala|borderStyle=solid}
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val csvTableSource = new CsvTableSource(
>       "/tmp/words",
>       Array("first", "id", "score", "last"),
>       Array(
>         BasicTypeInfo.STRING_TYPE_INFO,
>         BasicTypeInfo.INT_TYPE_INFO,
>         BasicTypeInfo.DOUBLE_TYPE_INFO,
>         BasicTypeInfo.STRING_TYPE_INFO
>       ),
>       fieldDelim = "#"
>     )
>     tEnv.registerTableSource("csv_source", csvTableSource)
>     val resultTable = tEnv.scan("csv_source")
>       .groupBy('first)
>       .select('first, 'score.sum)
>     resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
>     resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))
>     println(tEnv.explain(resultTable))
> {code}
> Results:
> == Abstract Syntax Tree ==
> LogicalProject(first=[$0], TMP_1=[$1])
>   LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
>     LogicalProject(first=[$0], score=[$2])
>       LogicalTableScan(table=[[csv_source]])
> == Optimized Logical Plan ==
> DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
>   BatchTableSourceScan(table=[[csv_source]], fields=[first, score])
> == Physical Execution Plan ==
> {color:red}
> Stage 6 : Data Source
> {color}
> 	content : collect elements with CollectionInputFormat
> 	Partitioning : RANDOM_PARTITIONED
> 	Stage 5 : Map
> 		content : prepare select: (first, SUM(score) AS TMP_0)
> 		ship_strategy : Forward
> 		exchange_mode : PIPELINED
> 		driver_strategy : Map
> 		Partitioning : RANDOM_PARTITIONED
> 		Stage 4 : GroupCombine
> 			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 			ship_strategy : Forward
> 			exchange_mode : PIPELINED
> 			driver_strategy : Sorted Combine
> 			Partitioning : RANDOM_PARTITIONED
> 			Stage 3 : GroupReduce
> 				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 				ship_strategy : Hash Partition on [0]
> 				exchange_mode : PIPELINED
> 				driver_strategy : Sorted Group Reduce
> 				Partitioning : RANDOM_PARTITIONED
> 				Stage 2 : Map
> 					content : to: Row(f0: String, f1: Double)
> 					ship_strategy : Forward
> 					exchange_mode : PIPELINED
> 					driver_strategy : Map
> 					Partitioning : RANDOM_PARTITIONED
> 					Stage 1 : Map
> 						content : Map at emitDataSet(CsvTableSink.scala:67)
> 						ship_strategy : Forward
> 						exchange_mode : PIPELINED
> 						driver_strategy : Map
> 						Partitioning : RANDOM_PARTITIONED
> 						Stage 0 : Data Sink
> 							content : TextOutputFormat (/tmp/wordcount1) - UTF-8
> 							ship_strategy : Forward
> 							exchange_mode : PIPELINED
> 							Partitioning : RANDOM_PARTITIONED
> {color:red}
> Stage 13 : Data Source
> {color}
> 	content : collect elements with CollectionInputFormat
> 	Partitioning : RANDOM_PARTITIONED
> 	Stage 12 : Map
> 		content : prepare select: (first, SUM(score) AS TMP_0)
> 		ship_strategy : Forward
> 		exchange_mode : PIPELINED
> 		driver_strategy : Map
> 		Partitioning : RANDOM_PARTITIONED
> 		Stage 11 : GroupCombine
> 			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 			ship_strategy : Forward
> 			exchange_mode : PIPELINED
> 			driver_strategy : Sorted Combine
> 			Partitioning : RANDOM_PARTITIONED
> 			Stage 10 : GroupReduce
> 				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 				ship_strategy : Hash Partition on [0]
> 				exchange_mode : PIPELINED
> 				driver_strategy : Sorted Group Reduce
> 				Partitioning : RANDOM_PARTITIONED
> 				Stage 9 : Map
> 					content : to: Row(f0: String, f1: Double)
> 					ship_strategy : Forward
> 					exchange_mode : PIPELINED
> 					driver_strategy : Map
> 					Partitioning : RANDOM_PARTITIONED
> 					Stage 8 : Map
> 						content : Map at emitDataSet(CsvTableSink.scala:67)
> 						ship_strategy : Forward
> 						exchange_mode : PIPELINED
> 						driver_strategy : Map
> 						Partitioning : RANDOM_PARTITIONED
> 						Stage 7 : Data Sink
> 							content : TextOutputFormat (/tmp/wordcount2) - UTF-8
> 							ship_strategy : Forward
> 							exchange_mode : PIPELINED
> 							Partitioning : RANDOM_PARTITIONED
> {color:red}
> Stage 18 : Data Source
> {color}
> 	content : collect elements with CollectionInputFormat
> 	Partitioning : RANDOM_PARTITIONED
> 	Stage 17 : Map
> 		content : prepare select: (first, SUM(score) AS TMP_0)
> 		ship_strategy : Forward
> 		exchange_mode : PIPELINED
> 		driver_strategy : Map
> 		Partitioning : RANDOM_PARTITIONED
> 		Stage 16 : GroupCombine
> 			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 			ship_strategy : Forward
> 			exchange_mode : PIPELINED
> 			driver_strategy : Sorted Combine
> 			Partitioning : RANDOM_PARTITIONED
> 			Stage 15 : GroupReduce
> 				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 				ship_strategy : Hash Partition on [0]
> 				exchange_mode : PIPELINED
> 				driver_strategy : Sorted Group Reduce
> 				Partitioning : RANDOM_PARTITIONED
> 				Stage 14 : Data Sink
> 					content : org.apache.flink.api.java.io.DiscardingOutputFormat
> 					ship_strategy : Forward
> 					exchange_mode : PIPELINED
> 					Partitioning : RANDOM_PARTITIONED



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)