You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/07/13 15:18:42 UTC

[flink] 02/02: [FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule.

This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 66353f27c4c6481443d1f04a8f23e7f98dd7beda
Author: libenchao <li...@gmail.com>
AuthorDate: Mon Apr 6 16:22:28 2020 +0800

    [FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule.
    
    This closes #11643
---
 .../stream/StreamExecTemporalSortRule.scala        | 15 ++++++--
 .../table/planner/plan/stream/sql/SortTest.xml     |  6 ++-
 .../runtime/stream/sql/TemporalSortITCase.scala    | 43 ++++++++++++++++++++++
 3 files changed, 58 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
index cf17cc5..3490526 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
@@ -21,8 +21,9 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort
 import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalSort
+import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelFieldCollation.Direction
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
@@ -46,12 +47,18 @@ class StreamExecTemporalSortRule
   override def convert(rel: RelNode): RelNode = {
     val sort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort]
     val input = sort.getInput()
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-    val convInput: RelNode = RelOptRule.convert(input, FlinkConventions.STREAM_PHYSICAL)
+    val requiredTraitSet = input.getTraitSet
+      .replace(FlinkRelDistribution.SINGLETON)
+      .replace(FlinkConventions.STREAM_PHYSICAL)
+    val providedTraitSet = sort.getTraitSet
+      .replace(FlinkRelDistribution.SINGLETON)
+      .replace(FlinkConventions.STREAM_PHYSICAL)
+
+    val convInput: RelNode = RelOptRule.convert(input, requiredTraitSet)
 
     new StreamExecTemporalSort(
       rel.getCluster,
-      traitSet,
+      providedTraitSet,
       convInput,
       sort.collation)
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
index af6e441..a1d191e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
@@ -32,7 +32,8 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a])
 +- TemporalSort(orderBy=[proctime ASC, c ASC])
-   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+   +- Exchange(distribution=[single])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -136,7 +137,8 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a])
 +- TemporalSort(orderBy=[rowtime ASC, c ASC])
-   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+   +- Exchange(distribution=[single])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
index de90d7e..ed90898 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
@@ -79,6 +79,49 @@ class TemporalSortITCase(mode: StateBackendMode) extends StreamingWithStateTestB
   }
 
   @Test
+  def testEventTimeOrderByWithParallelInput(): Unit = {
+    val data = List(
+      (3L, 2L, "Hello world", 3),
+      (2L, 2L, "Hello", 2),
+      (6L, 3L, "Luke Skywalker", 6),
+      (5L, 3L, "I am fine.", 5),
+      (7L, 4L, "Comment#1", 7),
+      (9L, 4L, "Comment#3", 9),
+      (10L, 4L, "Comment#4", 10),
+      (8L, 4L, "Comment#2", 8),
+      (1L, 1L, "Hi", 1),
+      (4L, 3L, "Helloworld, how are you?", 4))
+
+    val t = failingDataSource(data)
+      .assignTimestampsAndWatermarks(
+        new TimestampAndWatermarkWithOffset[(Long, Long, String, Int)](10L))
+      .setParallelism(env.getParallelism)
+      .toTable(tEnv, 'rowtime.rowtime, 'key, 'str, 'int)
+    tEnv.registerTable("T", t)
+
+    val sqlQuery = "SELECT key, str, `int` FROM T ORDER BY rowtime"
+
+    val sink = new TestingRetractSink
+    val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    results.addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq(
+      "1,Hi,1",
+      "2,Hello,2",
+      "2,Hello world,3",
+      "3,Helloworld, how are you?,4",
+      "3,I am fine.,5",
+      "3,Luke Skywalker,6",
+      "4,Comment#1,7",
+      "4,Comment#2,8",
+      "4,Comment#3,9",
+      "4,Comment#4,10")
+
+    assertEquals(expected, sink.getRetractResults)
+  }
+
+  @Test
   def testEventTimeAndOtherFieldOrderBy(): Unit = {
     val data = List(
       (3L, 2L, "Hello world", 3),