You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (JIRA)" <ji...@apache.org> on 2017/02/21 02:45:44 UTC

[jira] [Updated] (FLINK-5858) Support multiple sinks in same execution DAG

     [ https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

godfrey he updated FLINK-5858:
------------------------------
    Description: 
When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example:

{code:title=Example.scala|borderStyle=solid}
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val csvTableSource = new CsvTableSource(
      "/tmp/words",
      Array("first", "id", "score", "last"),
      Array(
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO,
        BasicTypeInfo.DOUBLE_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO
      ),
      fieldDelim = "#"
    )

    tEnv.registerTableSource("csv_source", csvTableSource)
    val resultTable = tEnv.scan("csv_source")
      .groupBy('first)
      .select('first, 'score.sum)

    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))

    println(tEnv.explain(resultTable))
{code}

result:

== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
    LogicalProject(first=[$0], score=[$2])
      LogicalTableScan(table=[[csv_source]])

== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
  BatchTableSourceScan(table=[[csv_source]], fields=[first, score])

== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
	content : collect elements with CollectionInputFormat
	Partitioning : RANDOM_PARTITIONED

	Stage 5 : Map
		content : prepare select: (first, SUM(score) AS TMP_0)
		ship_strategy : Forward
		exchange_mode : PIPELINED
		driver_strategy : Map
		Partitioning : RANDOM_PARTITIONED

		Stage 4 : GroupCombine
			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
			ship_strategy : Forward
			exchange_mode : PIPELINED
			driver_strategy : Sorted Combine
			Partitioning : RANDOM_PARTITIONED

			Stage 3 : GroupReduce
				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
				ship_strategy : Hash Partition on [0]
				exchange_mode : PIPELINED
				driver_strategy : Sorted Group Reduce
				Partitioning : RANDOM_PARTITIONED

				Stage 2 : Map
					content : to: Row(f0: String, f1: Double)
					ship_strategy : Forward
					exchange_mode : PIPELINED
					driver_strategy : Map
					Partitioning : RANDOM_PARTITIONED

					Stage 1 : Map
						content : Map at emitDataSet(CsvTableSink.scala:67)
						ship_strategy : Forward
						exchange_mode : PIPELINED
						driver_strategy : Map
						Partitioning : RANDOM_PARTITIONED

						Stage 0 : Data Sink
							content : TextOutputFormat (/tmp/wordcount1) - UTF-8
							ship_strategy : Forward
							exchange_mode : PIPELINED
							Partitioning : RANDOM_PARTITIONED

{color:red}
Stage 13 : Data Source
{color}
	content : collect elements with CollectionInputFormat
	Partitioning : RANDOM_PARTITIONED

	Stage 12 : Map
		content : prepare select: (first, SUM(score) AS TMP_0)
		ship_strategy : Forward
		exchange_mode : PIPELINED
		driver_strategy : Map
		Partitioning : RANDOM_PARTITIONED

		Stage 11 : GroupCombine
			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
			ship_strategy : Forward
			exchange_mode : PIPELINED
			driver_strategy : Sorted Combine
			Partitioning : RANDOM_PARTITIONED

			Stage 10 : GroupReduce
				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
				ship_strategy : Hash Partition on [0]
				exchange_mode : PIPELINED
				driver_strategy : Sorted Group Reduce
				Partitioning : RANDOM_PARTITIONED

				Stage 9 : Map
					content : to: Row(f0: String, f1: Double)
					ship_strategy : Forward
					exchange_mode : PIPELINED
					driver_strategy : Map
					Partitioning : RANDOM_PARTITIONED

					Stage 8 : Map
						content : Map at emitDataSet(CsvTableSink.scala:67)
						ship_strategy : Forward
						exchange_mode : PIPELINED
						driver_strategy : Map
						Partitioning : RANDOM_PARTITIONED

						Stage 7 : Data Sink
							content : TextOutputFormat (/tmp/wordcount2) - UTF-8
							ship_strategy : Forward
							exchange_mode : PIPELINED
							Partitioning : RANDOM_PARTITIONED

{color:red}
Stage 18 : Data Source
{color}
	content : collect elements with CollectionInputFormat
	Partitioning : RANDOM_PARTITIONED

	Stage 17 : Map
		content : prepare select: (first, SUM(score) AS TMP_0)
		ship_strategy : Forward
		exchange_mode : PIPELINED
		driver_strategy : Map
		Partitioning : RANDOM_PARTITIONED

		Stage 16 : GroupCombine
			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
			ship_strategy : Forward
			exchange_mode : PIPELINED
			driver_strategy : Sorted Combine
			Partitioning : RANDOM_PARTITIONED

			Stage 15 : GroupReduce
				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
				ship_strategy : Hash Partition on [0]
				exchange_mode : PIPELINED
				driver_strategy : Sorted Group Reduce
				Partitioning : RANDOM_PARTITIONED

				Stage 14 : Data Sink
					content : org.apache.flink.api.java.io.DiscardingOutputFormat
					ship_strategy : Forward
					exchange_mode : PIPELINED
					Partitioning : RANDOM_PARTITIONED


  was:
When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example:

{code:title=Example.scala|borderStyle=solid}
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val csvTableSource = new CsvTableSource(
      "/tmp/words",
      Array("first", "id", "score", "last"),
      Array(
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO,
        BasicTypeInfo.DOUBLE_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO
      ),
      fieldDelim = "#"
    )

    tEnv.registerTableSource("csv_source", csvTableSource)
    val resultTable = tEnv.scan("csv_source")
      .groupBy('first)
      .select('first, 'score.sum)

    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))

    println(tEnv.explain(resultTable))
{code}

result:

== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
    LogicalProject(first=[$0], score=[$2])
      LogicalTableScan(table=[[csv_source]])

== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
  BatchTableSourceScan(table=[[csv_source]], fields=[first, score])

== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
	content : collect elements with CollectionInputFormat
	Partitioning : RANDOM_PARTITIONED

	Stage 5 : Map
		content : prepare select: (first, SUM(score) AS TMP_0)
		ship_strategy : Forward
		exchange_mode : PIPELINED
		driver_strategy : Map
		Partitioning : RANDOM_PARTITIONED

		Stage 4 : GroupCombine
			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
			ship_strategy : Forward
			exchange_mode : PIPELINED
			driver_strategy : Sorted Combine
			Partitioning : RANDOM_PARTITIONED

			Stage 3 : GroupReduce
				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
				ship_strategy : Hash Partition on [0]
				exchange_mode : PIPELINED
				driver_strategy : Sorted Group Reduce
				Partitioning : RANDOM_PARTITIONED

				Stage 2 : Map
					content : to: Row(f0: String, f1: Double)
					ship_strategy : Forward
					exchange_mode : PIPELINED
					driver_strategy : Map
					Partitioning : RANDOM_PARTITIONED

					Stage 1 : Map
						content : Map at emitDataSet(CsvTableSink.scala:67)
						ship_strategy : Forward
						exchange_mode : PIPELINED
						driver_strategy : Map
						Partitioning : RANDOM_PARTITIONED

						Stage 0 : Data Sink
							content : TextOutputFormat (/tmp/wordcount1) - UTF-8
							ship_strategy : Forward
							exchange_mode : PIPELINED
							Partitioning : RANDOM_PARTITIONED

{color:red}
Stage 13 : Data Source
{color}
	content : collect elements with CollectionInputFormat
	Partitioning : RANDOM_PARTITIONED

	Stage 12 : Map
		content : prepare select: (first, SUM(score) AS TMP_0)
		ship_strategy : Forward
		exchange_mode : PIPELINED
		driver_strategy : Map
		Partitioning : RANDOM_PARTITIONED

		Stage 11 : GroupCombine
			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
			ship_strategy : Forward
			exchange_mode : PIPELINED
			driver_strategy : Sorted Combine
			Partitioning : RANDOM_PARTITIONED

			Stage 10 : GroupReduce
				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
				ship_strategy : Hash Partition on [0]
				exchange_mode : PIPELINED
				driver_strategy : Sorted Group Reduce
				Partitioning : RANDOM_PARTITIONED

				Stage 9 : Map
					content : to: Row(f0: String, f1: Double)
					ship_strategy : Forward
					exchange_mode : PIPELINED
					driver_strategy : Map
					Partitioning : RANDOM_PARTITIONED

					Stage 8 : Map
						content : Map at emitDataSet(CsvTableSink.scala:67)
						ship_strategy : Forward
						exchange_mode : PIPELINED
						driver_strategy : Map
						Partitioning : RANDOM_PARTITIONED

						Stage 7 : Data Sink
							content : TextOutputFormat (/tmp/wordcount2) - UTF-8
							ship_strategy : Forward
							exchange_mode : PIPELINED
							Partitioning : RANDOM_PARTITIONED

{color:red}
Stage 18 : Data Source
{color}
	content : collect elements with CollectionInputFormat
	Partitioning : RANDOM_PARTITIONED

	Stage 17 : Map
		content : prepare select: (first, SUM(score) AS TMP_0)
		ship_strategy : Forward
		exchange_mode : PIPELINED
		driver_strategy : Map
		Partitioning : RANDOM_PARTITIONED

		Stage 16 : GroupCombine
			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
			ship_strategy : Forward
			exchange_mode : PIPELINED
			driver_strategy : Sorted Combine
			Partitioning : RANDOM_PARTITIONED

			Stage 15 : GroupReduce
				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
				ship_strategy : Hash Partition on [0]
				exchange_mode : PIPELINED
				driver_strategy : Sorted Group Reduce
				Partitioning : RANDOM_PARTITIONED

				Stage 14 : Data Sink
					content : org.apache.flink.api.java.io.DiscardingOutputFormat
					ship_strategy : Forward
					exchange_mode : PIPELINED
					Partitioning : RANDOM_PARTITIONED


> Support multiple sinks in same execution DAG
> --------------------------------------------
>
>                 Key: FLINK-5858
>                 URL: https://issues.apache.org/jira/browse/FLINK-5858
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: godfrey he
>
> When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example:
> {code:title=Example.scala|borderStyle=solid}
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val csvTableSource = new CsvTableSource(
>       "/tmp/words",
>       Array("first", "id", "score", "last"),
>       Array(
>         BasicTypeInfo.STRING_TYPE_INFO,
>         BasicTypeInfo.INT_TYPE_INFO,
>         BasicTypeInfo.DOUBLE_TYPE_INFO,
>         BasicTypeInfo.STRING_TYPE_INFO
>       ),
>       fieldDelim = "#"
>     )
>     tEnv.registerTableSource("csv_source", csvTableSource)
>     val resultTable = tEnv.scan("csv_source")
>       .groupBy('first)
>       .select('first, 'score.sum)
>     resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
>     resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))
>     println(tEnv.explain(resultTable))
> {code}
> result:
> == Abstract Syntax Tree ==
> LogicalProject(first=[$0], TMP_1=[$1])
>   LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
>     LogicalProject(first=[$0], score=[$2])
>       LogicalTableScan(table=[[csv_source]])
> == Optimized Logical Plan ==
> DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
>   BatchTableSourceScan(table=[[csv_source]], fields=[first, score])
> == Physical Execution Plan ==
> {color:red}
> Stage 6 : Data Source
> {color}
> 	content : collect elements with CollectionInputFormat
> 	Partitioning : RANDOM_PARTITIONED
> 	Stage 5 : Map
> 		content : prepare select: (first, SUM(score) AS TMP_0)
> 		ship_strategy : Forward
> 		exchange_mode : PIPELINED
> 		driver_strategy : Map
> 		Partitioning : RANDOM_PARTITIONED
> 		Stage 4 : GroupCombine
> 			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 			ship_strategy : Forward
> 			exchange_mode : PIPELINED
> 			driver_strategy : Sorted Combine
> 			Partitioning : RANDOM_PARTITIONED
> 			Stage 3 : GroupReduce
> 				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 				ship_strategy : Hash Partition on [0]
> 				exchange_mode : PIPELINED
> 				driver_strategy : Sorted Group Reduce
> 				Partitioning : RANDOM_PARTITIONED
> 				Stage 2 : Map
> 					content : to: Row(f0: String, f1: Double)
> 					ship_strategy : Forward
> 					exchange_mode : PIPELINED
> 					driver_strategy : Map
> 					Partitioning : RANDOM_PARTITIONED
> 					Stage 1 : Map
> 						content : Map at emitDataSet(CsvTableSink.scala:67)
> 						ship_strategy : Forward
> 						exchange_mode : PIPELINED
> 						driver_strategy : Map
> 						Partitioning : RANDOM_PARTITIONED
> 						Stage 0 : Data Sink
> 							content : TextOutputFormat (/tmp/wordcount1) - UTF-8
> 							ship_strategy : Forward
> 							exchange_mode : PIPELINED
> 							Partitioning : RANDOM_PARTITIONED
> {color:red}
> Stage 13 : Data Source
> {color}
> 	content : collect elements with CollectionInputFormat
> 	Partitioning : RANDOM_PARTITIONED
> 	Stage 12 : Map
> 		content : prepare select: (first, SUM(score) AS TMP_0)
> 		ship_strategy : Forward
> 		exchange_mode : PIPELINED
> 		driver_strategy : Map
> 		Partitioning : RANDOM_PARTITIONED
> 		Stage 11 : GroupCombine
> 			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 			ship_strategy : Forward
> 			exchange_mode : PIPELINED
> 			driver_strategy : Sorted Combine
> 			Partitioning : RANDOM_PARTITIONED
> 			Stage 10 : GroupReduce
> 				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 				ship_strategy : Hash Partition on [0]
> 				exchange_mode : PIPELINED
> 				driver_strategy : Sorted Group Reduce
> 				Partitioning : RANDOM_PARTITIONED
> 				Stage 9 : Map
> 					content : to: Row(f0: String, f1: Double)
> 					ship_strategy : Forward
> 					exchange_mode : PIPELINED
> 					driver_strategy : Map
> 					Partitioning : RANDOM_PARTITIONED
> 					Stage 8 : Map
> 						content : Map at emitDataSet(CsvTableSink.scala:67)
> 						ship_strategy : Forward
> 						exchange_mode : PIPELINED
> 						driver_strategy : Map
> 						Partitioning : RANDOM_PARTITIONED
> 						Stage 7 : Data Sink
> 							content : TextOutputFormat (/tmp/wordcount2) - UTF-8
> 							ship_strategy : Forward
> 							exchange_mode : PIPELINED
> 							Partitioning : RANDOM_PARTITIONED
> {color:red}
> Stage 18 : Data Source
> {color}
> 	content : collect elements with CollectionInputFormat
> 	Partitioning : RANDOM_PARTITIONED
> 	Stage 17 : Map
> 		content : prepare select: (first, SUM(score) AS TMP_0)
> 		ship_strategy : Forward
> 		exchange_mode : PIPELINED
> 		driver_strategy : Map
> 		Partitioning : RANDOM_PARTITIONED
> 		Stage 16 : GroupCombine
> 			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 			ship_strategy : Forward
> 			exchange_mode : PIPELINED
> 			driver_strategy : Sorted Combine
> 			Partitioning : RANDOM_PARTITIONED
> 			Stage 15 : GroupReduce
> 				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
> 				ship_strategy : Hash Partition on [0]
> 				exchange_mode : PIPELINED
> 				driver_strategy : Sorted Group Reduce
> 				Partitioning : RANDOM_PARTITIONED
> 				Stage 14 : Data Sink
> 					content : org.apache.flink.api.java.io.DiscardingOutputFormat
> 					ship_strategy : Forward
> 					exchange_mode : PIPELINED
> 					Partitioning : RANDOM_PARTITIONED



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)