You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/27 09:12:20 UTC

[flink] branch release-1.11 updated: [FLINK-17751][table-planner-blink] Fix proctime defined in ddl can not work with over window in Table api

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new a080062  [FLINK-17751][table-planner-blink] Fix proctime defined in ddl can not work with over window in Table api
a080062 is described below

commit a0800628fcd59ffbbf9889ad1b4fe3a4218b807e
Author: godfrey he <go...@163.com>
AuthorDate: Wed May 27 17:10:30 2020 +0800

    [FLINK-17751][table-planner-blink] Fix proctime defined in ddl can not work with over window in Table api
    
    This closes #12342
---
 .../types/utils/LegacyTypeInfoDataTypeConverter.java   |  2 +-
 .../planner/plan/stream/table/TableSourceTest.xml      | 18 ++++++++++++++++++
 .../planner/plan/stream/table/TableSourceTest.scala    |  3 +--
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
index 3059348..f78a256 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
@@ -281,7 +281,7 @@ public final class LegacyTypeInfoDataTypeConverter {
 
 	private static boolean canConvertToTimeAttributeTypeInfo(DataType dataType) {
 		return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) &&
-			dataTypeTypeInfoMap.containsKey(dataType) && // checks precision and conversion
+			dataTypeTypeInfoMap.containsKey(dataType.nullable()) && // checks precision and conversion and ignore nullable
 			((TimestampType) dataType.getLogicalType()).getKind() != TimestampKind.REGULAR;
 	}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
index 3d4d078..b6dea7e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
@@ -82,6 +82,24 @@ Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testProcTimeTableSourceOverWindow">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalFilter(condition=[>($2, 100)])
++- LogicalProject(id=[$0], name=[$2], valSum=[AS(SUM($1) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'valSum')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, procTimeT]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[id, name, w0$o0 AS valSum], where=[>(w0$o0, 100)])
++- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[id, val, name, $3, SUM(val) AS w0$o0])
+   +- Exchange(distribution=[hash[id]])
+      +- Calc(select=[id, val, name, PROCTIME() AS $3])
+         +- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testProjectWithRowtimeProctime">
     <Resource name="planBefore">
       <![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
index c51801e..36b273d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
@@ -25,7 +25,7 @@ import org.junit.{Ignore, Test}
 
 class TableSourceTest extends TableTestBase {
 
-  val util = streamTestUtil()
+  private val util = streamTestUtil()
 
   @Test
   def testTableSourceWithTimestampRowTimeField(): Unit = {
@@ -95,7 +95,6 @@ class TableSourceTest extends TableTestBase {
     util.verifyPlan(t)
   }
 
-  @Ignore("remove ignore once FLINK-17751 is fixed")
   @Test
   def testProcTimeTableSourceOverWindow(): Unit = {
     val ddl =