You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/05/11 02:21:01 UTC
[flink] branch release-1.13 updated:
[FLINK-22523][table-planner-blink] Window TVF should throw helpful
exception when specifying offset parameter (#15803)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 1088b87 [FLINK-22523][table-planner-blink] Window TVF should throw helpful exception when specifying offset parameter (#15803)
1088b87 is described below
commit 1088b8726732d5121a40a88f38e2fe0bcefffb37
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue May 11 10:19:58 2021 +0800
[FLINK-22523][table-planner-blink] Window TVF should throw helpful exception when specifying offset parameter (#15803)
---
.../functions/sql/SqlCumulateTableFunction.java | 5 +-
.../planner/functions/sql/SqlHopTableFunction.java | 5 +-
.../functions/sql/SqlTumbleTableFunction.java | 7 +--
.../plan/stream/sql/WindowTableFunctionTest.scala | 55 ++++++++++++++++++++++
4 files changed, 67 insertions(+), 5 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java
index f5e6c56..82eb710 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCumulateTableFunction.java
@@ -57,6 +57,9 @@ public class SqlCumulateTableFunction extends SqlWindowTableFunction {
if (!checkIntervalOperands(callBinding, 2)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
}
+ if (callBinding.getOperandCount() == 5) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
// check time attribute
return throwExceptionOrReturnFalse(
checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure);
@@ -66,7 +69,7 @@ public class SqlCumulateTableFunction extends SqlWindowTableFunction {
public String getAllowedSignatures(SqlOperator op, String opName) {
return opName
+ "(TABLE table_name, DESCRIPTOR(timecol), "
- + "datetime interval, datetime interval[, datetime interval])";
+ + "datetime interval, datetime interval)";
}
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java
index 048a963..1d2a34f 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlHopTableFunction.java
@@ -57,6 +57,9 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
if (!checkIntervalOperands(callBinding, 2)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
}
+ if (callBinding.getOperandCount() == 5) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
// check time attribute
return throwExceptionOrReturnFalse(
checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure);
@@ -66,7 +69,7 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
public String getAllowedSignatures(SqlOperator op, String opName) {
return opName
+ "(TABLE table_name, DESCRIPTOR(timecol), "
- + "datetime interval, datetime interval[, datetime interval])";
+ + "datetime interval, datetime interval)";
}
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java
index b0f3cb4..c3149fd 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlTumbleTableFunction.java
@@ -55,6 +55,9 @@ public class SqlTumbleTableFunction extends SqlWindowTableFunction {
if (!checkIntervalOperands(callBinding, 2)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
}
+ if (callBinding.getOperandCount() == 4) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
// check time attribute
return throwExceptionOrReturnFalse(
checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure);
@@ -62,9 +65,7 @@ public class SqlTumbleTableFunction extends SqlWindowTableFunction {
@Override
public String getAllowedSignatures(SqlOperator op, String opName) {
- return opName
- + "(TABLE table_name, DESCRIPTOR(timecol), datetime interval"
- + "[, datetime interval])";
+ return opName + "(TABLE table_name, DESCRIPTOR(timecol), datetime interval)";
}
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
index d3f93c8..640fade 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
@@ -140,4 +140,59 @@ class WindowTableFunctionTest extends TableTestBase {
util.verifyExplain(sql)
}
+ @Test
+ def testInvalidTumbleParameters(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM TABLE(TUMBLE(
+ | TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE, INTERVAL '5' MINUTE))
+ |""".stripMargin
+
+ thrown.expectMessage("Supported form(s): " +
+ "TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval)")
+ thrown.expect(classOf[ValidationException])
+ util.verifyExplain(sql)
+ }
+
+ @Test
+ def testInvalidHopParameters(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM TABLE(
+ | HOP(
+ | TABLE MyTable,
+ | DESCRIPTOR(rowtime),
+ | INTERVAL '1' MINUTE,
+ | INTERVAL '15' MINUTE,
+ | INTERVAL '5' MINUTE))
+ |""".stripMargin
+
+ thrown.expectMessage("Supported form(s): " +
+ "HOP(TABLE table_name, DESCRIPTOR(timecol), datetime interval, datetime interval)")
+ thrown.expect(classOf[ValidationException])
+ util.verifyExplain(sql)
+ }
+
+ @Test
+ def testInvalidCumulateParameters(): Unit = {
+ val sql =
+ """
+ |SELECT *
+ |FROM TABLE(
+ | CUMULATE(
+ | TABLE MyTable,
+ | DESCRIPTOR(rowtime),
+ | INTERVAL '1' MINUTE,
+ | INTERVAL '15' MINUTE,
+ | INTERVAL '5' MINUTE))
+ |""".stripMargin
+
+ thrown.expectMessage("Supported form(s): " +
+ "CUMULATE(TABLE table_name, DESCRIPTOR(timecol), datetime interval, datetime interval)")
+ thrown.expect(classOf[ValidationException])
+ util.verifyExplain(sql)
+ }
+
}