You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/05/11 02:21:01 UTC

[flink] branch release-1.13 updated: [FLINK-22523][table-planner-blink] Window TVF should throw helpful exception when specifying offset parameter (#15803)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 1088b87  [FLINK-22523][table-planner-blink] Window TVF should throw helpful exception when specifying offset parameter (#15803)
1088b87 is described below

commit 1088b8726732d5121a40a88f38e2fe0bcefffb37
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue May 11 10:19:58 2021 +0800

    [FLINK-22523][table-planner-blink] Window TVF should throw helpful exception when specifying offset parameter (#15803)
---
 .../functions/sql/SqlCumulateTableFunction.java    |  5 +-
 .../planner/functions/sql/SqlHopTableFunction.java |  5 +-
 .../functions/sql/SqlTumbleTableFunction.java      |  7 +--
 .../plan/stream/sql/WindowTableFunctionTest.scala  | 55 ++++++++++++++++++++++
 4 files changed, 67 insertions(+), 5 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java
index f5e6c56..82eb710 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java
@@ -57,6 +57,9 @@ public class SqlCumulateTableFunction extends SqlWindowTableFunction {
             if (!checkIntervalOperands(callBinding, 2)) {
                 return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
             }
+            if (callBinding.getOperandCount() == 5) {
+                return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+            }
             // check time attribute
             return throwExceptionOrReturnFalse(
                     checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure);
@@ -66,7 +69,7 @@ public class SqlCumulateTableFunction extends SqlWindowTableFunction {
         public String getAllowedSignatures(SqlOperator op, String opName) {
             return opName
                     + "(TABLE table_name, DESCRIPTOR(timecol), "
-                    + "datetime interval, datetime interval[, datetime interval])";
+                    + "datetime interval, datetime interval)";
         }
     }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java
index 048a963..1d2a34f 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java
@@ -57,6 +57,9 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
             if (!checkIntervalOperands(callBinding, 2)) {
                 return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
             }
+            if (callBinding.getOperandCount() == 5) {
+                return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+            }
             // check time attribute
             return throwExceptionOrReturnFalse(
                     checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure);
@@ -66,7 +69,7 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
         public String getAllowedSignatures(SqlOperator op, String opName) {
             return opName
                     + "(TABLE table_name, DESCRIPTOR(timecol), "
-                    + "datetime interval, datetime interval[, datetime interval])";
+                    + "datetime interval, datetime interval)";
         }
     }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java
index b0f3cb4..c3149fd 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java
@@ -55,6 +55,9 @@ public class SqlTumbleTableFunction extends SqlWindowTableFunction {
             if (!checkIntervalOperands(callBinding, 2)) {
                 return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
             }
+            if (callBinding.getOperandCount() == 4) {
+                return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+            }
             // check time attribute
             return throwExceptionOrReturnFalse(
                     checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure);
@@ -62,9 +65,7 @@ public class SqlTumbleTableFunction extends SqlWindowTableFunction {
 
         @Override
         public String getAllowedSignatures(SqlOperator op, String opName) {
-            return opName
-                    + "(TABLE table_name, DESCRIPTOR(timecol), datetime interval"
-                    + "[, datetime interval])";
+            return opName + "(TABLE table_name, DESCRIPTOR(timecol), datetime interval)";
         }
     }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
index d3f93c8..640fade 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
@@ -140,4 +140,59 @@ class WindowTableFunctionTest extends TableTestBase {
     util.verifyExplain(sql)
   }
 
+  @Test
+  def testInvalidTumbleParameters(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(TUMBLE(
+        |   TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE, INTERVAL '5' MINUTE))
+        |""".stripMargin
+
+    thrown.expectMessage("Supported form(s): " +
+      "TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval)")
+    thrown.expect(classOf[ValidationException])
+    util.verifyExplain(sql)
+  }
+
+  @Test
+  def testInvalidHopParameters(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(
+        |  HOP(
+        |    TABLE MyTable,
+        |    DESCRIPTOR(rowtime),
+        |    INTERVAL '1' MINUTE,
+        |    INTERVAL '15' MINUTE,
+        |    INTERVAL '5' MINUTE))
+        |""".stripMargin
+
+    thrown.expectMessage("Supported form(s): " +
+      "HOP(TABLE table_name, DESCRIPTOR(timecol), datetime interval, datetime interval)")
+    thrown.expect(classOf[ValidationException])
+    util.verifyExplain(sql)
+  }
+
+  @Test
+  def testInvalidCumulateParameters(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM TABLE(
+        |  CUMULATE(
+        |    TABLE MyTable,
+        |    DESCRIPTOR(rowtime),
+        |    INTERVAL '1' MINUTE,
+        |    INTERVAL '15' MINUTE,
+        |    INTERVAL '5' MINUTE))
+        |""".stripMargin
+
+    thrown.expectMessage("Supported form(s): " +
+      "CUMULATE(TABLE table_name, DESCRIPTOR(timecol), datetime interval, datetime interval)")
+    thrown.expect(classOf[ValidationException])
+    util.verifyExplain(sql)
+  }
+
 }