You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (JIRA)" <ji...@apache.org> on 2017/02/21 02:43:44 UTC

[jira] [Created] (FLINK-5858) Support multiple sinks in same execution DAG

godfrey he created FLINK-5858:
---------------------------------

             Summary: Support multiple sinks in same execution DAG
                 Key: FLINK-5858
                 URL: https://issues.apache.org/jira/browse/FLINK-5858
             Project: Flink
          Issue Type: Improvement
          Components: Table API & SQL
            Reporter: godfrey he


When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example:

{code:title=Example.scala|borderStyle=solid}
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val csvTableSource = new CsvTableSource(
      "/tmp/words",
      Array("first", "id", "score", "last"),
      Array(
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO,
        BasicTypeInfo.DOUBLE_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO
      ),
      fieldDelim = "#"
    )

    tEnv.registerTableSource("csv_source", csvTableSource)
    val resultTable = tEnv.scan("csv_source")
      .groupBy('first)
      .select('first, 'score.sum)

    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))

    println(tEnv.explain(resultTable))
{code}

result:

== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
    LogicalProject(first=[$0], score=[$2])
      LogicalTableScan(table=[[csv_source]])

== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
  BatchTableSourceScan(table=[[csv_source]], fields=[first, score])

== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
	content : collect elements with CollectionInputFormat
	Partitioning : RANDOM_PARTITIONED

	Stage 5 : Map
		content : prepare select: (first, SUM(score) AS TMP_0)
		ship_strategy : Forward
		exchange_mode : PIPELINED
		driver_strategy : Map
		Partitioning : RANDOM_PARTITIONED

		Stage 4 : GroupCombine
			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
			ship_strategy : Forward
			exchange_mode : PIPELINED
			driver_strategy : Sorted Combine
			Partitioning : RANDOM_PARTITIONED

			Stage 3 : GroupReduce
				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
				ship_strategy : Hash Partition on [0]
				exchange_mode : PIPELINED
				driver_strategy : Sorted Group Reduce
				Partitioning : RANDOM_PARTITIONED

				Stage 2 : Map
					content : to: Row(f0: String, f1: Double)
					ship_strategy : Forward
					exchange_mode : PIPELINED
					driver_strategy : Map
					Partitioning : RANDOM_PARTITIONED

					Stage 1 : Map
						content : Map at emitDataSet(CsvTableSink.scala:67)
						ship_strategy : Forward
						exchange_mode : PIPELINED
						driver_strategy : Map
						Partitioning : RANDOM_PARTITIONED

						Stage 0 : Data Sink
							content : TextOutputFormat (/tmp/wordcount1) - UTF-8
							ship_strategy : Forward
							exchange_mode : PIPELINED
							Partitioning : RANDOM_PARTITIONED

{color:red}
Stage 13 : Data Source
{color}
	content : collect elements with CollectionInputFormat
	Partitioning : RANDOM_PARTITIONED

	Stage 12 : Map
		content : prepare select: (first, SUM(score) AS TMP_0)
		ship_strategy : Forward
		exchange_mode : PIPELINED
		driver_strategy : Map
		Partitioning : RANDOM_PARTITIONED

		Stage 11 : GroupCombine
			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
			ship_strategy : Forward
			exchange_mode : PIPELINED
			driver_strategy : Sorted Combine
			Partitioning : RANDOM_PARTITIONED

			Stage 10 : GroupReduce
				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
				ship_strategy : Hash Partition on [0]
				exchange_mode : PIPELINED
				driver_strategy : Sorted Group Reduce
				Partitioning : RANDOM_PARTITIONED

				Stage 9 : Map
					content : to: Row(f0: String, f1: Double)
					ship_strategy : Forward
					exchange_mode : PIPELINED
					driver_strategy : Map
					Partitioning : RANDOM_PARTITIONED

					Stage 8 : Map
						content : Map at emitDataSet(CsvTableSink.scala:67)
						ship_strategy : Forward
						exchange_mode : PIPELINED
						driver_strategy : Map
						Partitioning : RANDOM_PARTITIONED

						Stage 7 : Data Sink
							content : TextOutputFormat (/tmp/wordcount2) - UTF-8
							ship_strategy : Forward
							exchange_mode : PIPELINED
							Partitioning : RANDOM_PARTITIONED

{color:red}
Stage 18 : Data Source
{color}
	content : collect elements with CollectionInputFormat
	Partitioning : RANDOM_PARTITIONED

	Stage 17 : Map
		content : prepare select: (first, SUM(score) AS TMP_0)
		ship_strategy : Forward
		exchange_mode : PIPELINED
		driver_strategy : Map
		Partitioning : RANDOM_PARTITIONED

		Stage 16 : GroupCombine
			content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
			ship_strategy : Forward
			exchange_mode : PIPELINED
			driver_strategy : Sorted Combine
			Partitioning : RANDOM_PARTITIONED

			Stage 15 : GroupReduce
				content : groupBy: (first), select: (first, SUM(score) AS TMP_0)
				ship_strategy : Hash Partition on [0]
				exchange_mode : PIPELINED
				driver_strategy : Sorted Group Reduce
				Partitioning : RANDOM_PARTITIONED

				Stage 14 : Data Sink
					content : org.apache.flink.api.java.io.DiscardingOutputFormat
					ship_strategy : Forward
					exchange_mode : PIPELINED
					Partitioning : RANDOM_PARTITIONED



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)