You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/27 09:12:20 UTC
[flink] branch release-1.11 updated:
[FLINK-17751][table-planner-blink] Fix proctime defined in ddl can not work
with over window in Table api
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new a080062 [FLINK-17751][table-planner-blink] Fix proctime defined in ddl can not work with over window in Table api
a080062 is described below
commit a0800628fcd59ffbbf9889ad1b4fe3a4218b807e
Author: godfrey he <go...@163.com>
AuthorDate: Wed May 27 17:10:30 2020 +0800
[FLINK-17751][table-planner-blink] Fix proctime defined in ddl can not work with over window in Table api
This closes #12342
---
.../types/utils/LegacyTypeInfoDataTypeConverter.java | 2 +-
.../planner/plan/stream/table/TableSourceTest.xml | 18 ++++++++++++++++++
.../planner/plan/stream/table/TableSourceTest.scala | 3 +--
3 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
index 3059348..f78a256 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
@@ -281,7 +281,7 @@ public final class LegacyTypeInfoDataTypeConverter {
private static boolean canConvertToTimeAttributeTypeInfo(DataType dataType) {
return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) &&
- dataTypeTypeInfoMap.containsKey(dataType) && // checks precision and conversion
+ dataTypeTypeInfoMap.containsKey(dataType.nullable()) && // checks precision and conversion and ignore nullable
((TimestampType) dataType.getLogicalType()).getKind() != TimestampKind.REGULAR;
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
index 3d4d078..b6dea7e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
@@ -82,6 +82,24 @@ Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
]]>
</Resource>
</TestCase>
+ <TestCase name="testProcTimeTableSourceOverWindow">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalFilter(condition=[>($2, 100)])
++- LogicalProject(id=[$0], name=[$2], valSum=[AS(SUM($1) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'valSum')])
+ +- LogicalTableScan(table=[[default_catalog, default_database, procTimeT]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[id, name, w0$o0 AS valSum], where=[>(w0$o0, 100)])
++- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[id, val, name, $3, SUM(val) AS w0$o0])
+ +- Exchange(distribution=[hash[id]])
+ +- Calc(select=[id, val, name, PROCTIME() AS $3])
+ +- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testProjectWithRowtimeProctime">
<Resource name="planBefore">
<![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
index c51801e..36b273d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
@@ -25,7 +25,7 @@ import org.junit.{Ignore, Test}
class TableSourceTest extends TableTestBase {
- val util = streamTestUtil()
+ private val util = streamTestUtil()
@Test
def testTableSourceWithTimestampRowTimeField(): Unit = {
@@ -95,7 +95,6 @@ class TableSourceTest extends TableTestBase {
util.verifyPlan(t)
}
- @Ignore("remove ignore once FLINK-17751 is fixed")
@Test
def testProcTimeTableSourceOverWindow(): Unit = {
val ddl =