You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/07/13 15:18:42 UTC
[flink] 02/02: [FLINK-16827][table-planner-blink]
StreamExecTemporalSort should require a distribution trait in
StreamExecTemporalSortRule.
This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 66353f27c4c6481443d1f04a8f23e7f98dd7beda
Author: libenchao <li...@gmail.com>
AuthorDate: Mon Apr 6 16:22:28 2020 +0800
[FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule.
This closes #11643
---
.../stream/StreamExecTemporalSortRule.scala | 15 ++++++--
.../table/planner/plan/stream/sql/SortTest.xml | 6 ++-
.../runtime/stream/sql/TemporalSortITCase.scala | 43 ++++++++++++++++++++++
3 files changed, 58 insertions(+), 6 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
index cf17cc5..3490526 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
@@ -21,8 +21,9 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalSort
+import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelFieldCollation.Direction
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
@@ -46,12 +47,18 @@ class StreamExecTemporalSortRule
override def convert(rel: RelNode): RelNode = {
val sort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort]
val input = sort.getInput()
- val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
- val convInput: RelNode = RelOptRule.convert(input, FlinkConventions.STREAM_PHYSICAL)
+ val requiredTraitSet = input.getTraitSet
+ .replace(FlinkRelDistribution.SINGLETON)
+ .replace(FlinkConventions.STREAM_PHYSICAL)
+ val providedTraitSet = sort.getTraitSet
+ .replace(FlinkRelDistribution.SINGLETON)
+ .replace(FlinkConventions.STREAM_PHYSICAL)
+
+ val convInput: RelNode = RelOptRule.convert(input, requiredTraitSet)
new StreamExecTemporalSort(
rel.getCluster,
- traitSet,
+ providedTraitSet,
convInput,
sort.collation)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
index af6e441..a1d191e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
@@ -32,7 +32,8 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a])
+- TemporalSort(orderBy=[proctime ASC, c ASC])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+ +- Exchange(distribution=[single])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
@@ -136,7 +137,8 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a])
+- TemporalSort(orderBy=[rowtime ASC, c ASC])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+ +- Exchange(distribution=[single])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
index de90d7e..ed90898 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
@@ -79,6 +79,49 @@ class TemporalSortITCase(mode: StateBackendMode) extends StreamingWithStateTestB
}
@Test
+ def testEventTimeOrderByWithParallelInput(): Unit = {
+ val data = List(
+ (3L, 2L, "Hello world", 3),
+ (2L, 2L, "Hello", 2),
+ (6L, 3L, "Luke Skywalker", 6),
+ (5L, 3L, "I am fine.", 5),
+ (7L, 4L, "Comment#1", 7),
+ (9L, 4L, "Comment#3", 9),
+ (10L, 4L, "Comment#4", 10),
+ (8L, 4L, "Comment#2", 8),
+ (1L, 1L, "Hi", 1),
+ (4L, 3L, "Helloworld, how are you?", 4))
+
+ val t = failingDataSource(data)
+ .assignTimestampsAndWatermarks(
+ new TimestampAndWatermarkWithOffset[(Long, Long, String, Int)](10L))
+ .setParallelism(env.getParallelism)
+ .toTable(tEnv, 'rowtime.rowtime, 'key, 'str, 'int)
+ tEnv.registerTable("T", t)
+
+ val sqlQuery = "SELECT key, str, `int` FROM T ORDER BY rowtime"
+
+ val sink = new TestingRetractSink
+ val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+ results.addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = Seq(
+ "1,Hi,1",
+ "2,Hello,2",
+ "2,Hello world,3",
+ "3,Helloworld, how are you?,4",
+ "3,I am fine.,5",
+ "3,Luke Skywalker,6",
+ "4,Comment#1,7",
+ "4,Comment#2,8",
+ "4,Comment#3,9",
+ "4,Comment#4,10")
+
+ assertEquals(expected, sink.getRetractResults)
+ }
+
+ @Test
def testEventTimeAndOtherFieldOrderBy(): Unit = {
val data = List(
(3L, 2L, "Hello world", 3),