You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2024/03/20 14:15:00 UTC
(flink) branch master updated: [FLINK-34745] Improve validations for a period in Time Travel (#24534)
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4142c4386a9 [FLINK-34745] Improve validations for a period in Time Travel (#24534)
4142c4386a9 is described below
commit 4142c4386a92f1ec5016583f4832f8869782765e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Mar 20 15:14:52 2024 +0100
[FLINK-34745] Improve validations for a period in Time Travel (#24534)
---
.../planner/calcite/FlinkCalciteSqlValidator.java | 22 ++++++++++++++++++----
.../plan/stream/sql/join/TemporalJoinTest.scala | 21 +++++++++++++++++++++
2 files changed, 39 insertions(+), 4 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index f091ab3e70a..4eb652dde60 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -59,6 +59,7 @@ import org.apache.calcite.sql.SqlTableFunction;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindowTableFunction;
import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.DelegatingScope;
import org.apache.calcite.sql.validate.IdentifierNamespace;
import org.apache.calcite.sql.validate.IdentifierSnapshotNamespace;
@@ -203,7 +204,7 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl {
Optional<SqlSnapshot> snapshot = getSnapShotNode(ns);
if (usingScope != null
&& snapshot.isPresent()
- && !(snapshot.get().getPeriod() instanceof SqlIdentifier)) {
+ && !(hasInputReference(snapshot.get().getPeriod()))) {
SqlSnapshot sqlSnapshot = snapshot.get();
SqlNode periodNode = sqlSnapshot.getPeriod();
SqlToRelConverter sqlToRelConverter = this.createSqlToRelConverter();
@@ -222,14 +223,23 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl {
Collections.singletonList(simplifiedRexNode),
reducedNodes);
// check whether period is the unsupported expression
- if (!(reducedNodes.get(0) instanceof RexLiteral)) {
- throw new UnsupportedOperationException(
+ final RexNode reducedNode = reducedNodes.get(0);
+ if (!(reducedNode instanceof RexLiteral)) {
+ throw new ValidationException(
String.format(
"Unsupported time travel expression: %s for the expression can not be reduced to a constant by Flink.",
periodNode));
}
- RexLiteral rexLiteral = (RexLiteral) (reducedNodes).get(0);
+ RexLiteral rexLiteral = (RexLiteral) reducedNode;
+ final RelDataType sqlType = rexLiteral.getType();
+ if (!SqlTypeUtil.isTimestamp(sqlType)) {
+ throw newValidationError(
+ periodNode,
+ Static.RESOURCE.illegalExpressionForTemporal(
+ sqlType.getSqlTypeName().getName()));
+ }
+
TimestampString timestampString = rexLiteral.getValueAs(TimestampString.class);
checkNotNull(
timestampString,
@@ -264,6 +274,10 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl {
super.registerNamespace(usingScope, alias, ns, forceNullable);
}
+ private static boolean hasInputReference(SqlNode node) {
+ return node.accept(new SqlToRelConverter.SqlIdentifierFinder());
+ }
+
/**
* Get the {@link SqlSnapshot} node in a {@link SqlValidatorNamespace}.
*
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
index 29d6521540c..351cc8429dd 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
@@ -508,6 +508,27 @@ class TemporalJoinTest extends TableTestBase {
" table, but the rowtime types are TIMESTAMP_LTZ(3) *ROWTIME* and TIMESTAMP(3) *ROWTIME*.",
classOf[ValidationException]
)
+
+ val sqlQuery9 = "SELECT * " +
+ "FROM Orders AS o JOIN " +
+ "RatesHistoryWithPK FOR SYSTEM_TIME AS OF 'o.rowtime' AS r " +
+ "ON o.currency = r.currency"
+ expectExceptionThrown(
+ sqlQuery9,
+ "The system time period specification expects Timestamp type but is 'CHAR'",
+ classOf[ValidationException]
+ )
+
+ val sqlQuery10 = "SELECT * " +
+ "FROM Orders AS o JOIN " +
+ "RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime + INTERVAL '1' SECOND AS r " +
+ "ON o.currency = r.currency"
+ expectExceptionThrown(
+ sqlQuery10,
+ "Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's time" +
+ " attribute field'",
+ classOf[ValidationException]
+ )
}
@Test