You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2024/03/20 14:15:00 UTC

(flink) branch master updated: [FLINK-34745] Improve validations for a period in Time Travel (#24534)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4142c4386a9 [FLINK-34745] Improve validations for a period in Time Travel (#24534)
4142c4386a9 is described below

commit 4142c4386a92f1ec5016583f4832f8869782765e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Mar 20 15:14:52 2024 +0100

    [FLINK-34745] Improve validations for a period in Time Travel (#24534)
---
 .../planner/calcite/FlinkCalciteSqlValidator.java  | 22 ++++++++++++++++++----
 .../plan/stream/sql/join/TemporalJoinTest.scala    | 21 +++++++++++++++++++++
 2 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index f091ab3e70a..4eb652dde60 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -59,6 +59,7 @@ import org.apache.calcite.sql.SqlTableFunction;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.SqlWindowTableFunction;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.DelegatingScope;
 import org.apache.calcite.sql.validate.IdentifierNamespace;
 import org.apache.calcite.sql.validate.IdentifierSnapshotNamespace;
@@ -203,7 +204,7 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl {
         Optional<SqlSnapshot> snapshot = getSnapShotNode(ns);
         if (usingScope != null
                 && snapshot.isPresent()
-                && !(snapshot.get().getPeriod() instanceof SqlIdentifier)) {
+                && !(hasInputReference(snapshot.get().getPeriod()))) {
             SqlSnapshot sqlSnapshot = snapshot.get();
             SqlNode periodNode = sqlSnapshot.getPeriod();
             SqlToRelConverter sqlToRelConverter = this.createSqlToRelConverter();
@@ -222,14 +223,23 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl {
                             Collections.singletonList(simplifiedRexNode),
                             reducedNodes);
             // check whether period is the unsupported expression
-            if (!(reducedNodes.get(0) instanceof RexLiteral)) {
-                throw new UnsupportedOperationException(
+            final RexNode reducedNode = reducedNodes.get(0);
+            if (!(reducedNode instanceof RexLiteral)) {
+                throw new ValidationException(
                         String.format(
                                 "Unsupported time travel expression: %s for the expression can not be reduced to a constant by Flink.",
                                 periodNode));
             }
 
-            RexLiteral rexLiteral = (RexLiteral) (reducedNodes).get(0);
+            RexLiteral rexLiteral = (RexLiteral) reducedNode;
+            final RelDataType sqlType = rexLiteral.getType();
+            if (!SqlTypeUtil.isTimestamp(sqlType)) {
+                throw newValidationError(
+                        periodNode,
+                        Static.RESOURCE.illegalExpressionForTemporal(
+                                sqlType.getSqlTypeName().getName()));
+            }
+
             TimestampString timestampString = rexLiteral.getValueAs(TimestampString.class);
             checkNotNull(
                     timestampString,
@@ -264,6 +274,10 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl {
         super.registerNamespace(usingScope, alias, ns, forceNullable);
     }
 
+    private static boolean hasInputReference(SqlNode node) {
+        return node.accept(new SqlToRelConverter.SqlIdentifierFinder());
+    }
+
     /**
      * Get the {@link SqlSnapshot} node in a {@link SqlValidatorNamespace}.
      *
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
index 29d6521540c..351cc8429dd 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/TemporalJoinTest.scala
@@ -508,6 +508,27 @@ class TemporalJoinTest extends TableTestBase {
         " table, but the rowtime types are TIMESTAMP_LTZ(3) *ROWTIME* and TIMESTAMP(3) *ROWTIME*.",
       classOf[ValidationException]
     )
+
+    val sqlQuery9 = "SELECT * " +
+      "FROM Orders AS o JOIN " +
+      "RatesHistoryWithPK FOR SYSTEM_TIME AS OF 'o.rowtime' AS r " +
+      "ON o.currency = r.currency"
+    expectExceptionThrown(
+      sqlQuery9,
+      "The system time period specification expects Timestamp type but is 'CHAR'",
+      classOf[ValidationException]
+    )
+
+    val sqlQuery10 = "SELECT * " +
+      "FROM Orders AS o JOIN " +
+      "RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime + INTERVAL '1' SECOND AS r " +
+      "ON o.currency = r.currency"
+    expectExceptionThrown(
+      sqlQuery10,
+      "Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's time" +
+        " attribute field'",
+      classOf[ValidationException]
+    )
   }
 
   @Test