You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/27 13:38:19 UTC

[flink] 01/02: [FLINK-12254][table] Update cast() and TypeLiteralExpression to new type system

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9d1c1d55841479cd3beffd22870714a31ab9ac46
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 22 10:25:02 2019 +0200

    [FLINK-12254][table] Update cast() and TypeLiteralExpression to new type system
---
 .../table/expressions/ApiExpressionUtils.java      | 30 +++--------------
 .../table/expressions/TypeLiteralExpression.java   | 21 ++++++------
 .../flink/table/expressions/ExpressionBuilder.java |  3 +-
 .../flink/table/expressions/RexNodeConverter.java  |  7 ++--
 .../flink/table/sources/TableSourceUtil.scala      | 19 +++++------
 .../table/sources/tsextractors/ExistingField.scala | 15 ++++++---
 .../rules/ResolveCallByArgumentsRule.java          |  5 ++-
 .../flink/table/api/scala/expressionDsl.scala      | 25 ++++++++++----
 .../expressions/PlannerExpressionConverter.scala   |  3 +-
 .../expressions/PlannerExpressionParserImpl.scala  | 38 +++++++++++++++++-----
 10 files changed, 95 insertions(+), 71 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
index 1ef8db3..47cfe77 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
@@ -23,14 +23,12 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
 
 import java.util.Arrays;
 import java.util.Optional;
 
-import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.CAST;
-import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIMES;
-
 /**
  * Utilities for API-specific {@link Expression}s.
  */
@@ -61,8 +59,8 @@ public final class ApiExpressionUtils {
 		return new ValueLiteralExpression(value, type);
 	}
 
-	public static TypeLiteralExpression typeLiteral(TypeInformation<?> type) {
-		return new TypeLiteralExpression(type);
+	public static TypeLiteralExpression typeLiteral(DataType dataType) {
+		return new TypeLiteralExpression(dataType);
 	}
 
 	public static SymbolExpression symbol(TableSymbol symbol) {
@@ -85,17 +83,7 @@ public final class ApiExpressionUtils {
 		// check for constant
 		return ExpressionUtils.extractValue(e, BasicTypeInfo.INT_TYPE_INFO)
 			.map((v) -> (Expression) valueLiteral(v * multiplier, TimeIntervalTypeInfo.INTERVAL_MONTHS))
-			.orElse(
-				call(
-					CAST,
-					call(
-						TIMES,
-						e,
-						valueLiteral(multiplier)
-					),
-					typeLiteral(TimeIntervalTypeInfo.INTERVAL_MONTHS)
-				)
-			);
+			.orElseThrow(() -> new ValidationException("Only constant intervals are supported: " + e));
 	}
 
 	public static Expression toMilliInterval(Expression e, long multiplier) {
@@ -110,15 +98,7 @@ public final class ApiExpressionUtils {
 		} else if (longInterval.isPresent()) {
 			return longInterval.get();
 		}
-		return call(
-			CAST,
-			call(
-				TIMES,
-				e,
-				valueLiteral(multiplier)
-			),
-			typeLiteral(TimeIntervalTypeInfo.INTERVAL_MONTHS)
-		);
+		throw new ValidationException("Only constant intervals are supported:" + e);
 	}
 
 	public static Expression toRowInterval(Expression e) {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/TypeLiteralExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/TypeLiteralExpression.java
index e7ff10c..f50ab00 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/TypeLiteralExpression.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/TypeLiteralExpression.java
@@ -19,8 +19,7 @@
 package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.utils.TypeStringUtils;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
@@ -28,19 +27,19 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * Expression that wraps {@link TypeInformation} as a literal.
+ * Expression that wraps {@link DataType} as a literal.
  */
 @PublicEvolving
 public final class TypeLiteralExpression implements Expression {
 
-	private final TypeInformation<?> type;
+	private final DataType dataType;
 
-	public TypeLiteralExpression(TypeInformation<?> type) {
-		this.type = Preconditions.checkNotNull(type);
+	public TypeLiteralExpression(DataType dataType) {
+		this.dataType = Preconditions.checkNotNull(dataType, "Data type must not be null.");
 	}
 
-	public TypeInformation<?> getType() {
-		return type;
+	public DataType getDataType() {
+		return dataType;
 	}
 
 	@Override
@@ -62,16 +61,16 @@ public final class TypeLiteralExpression implements Expression {
 			return false;
 		}
 		TypeLiteralExpression that = (TypeLiteralExpression) o;
-		return Objects.equals(type, that.type);
+		return dataType.equals(that.dataType);
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(type);
+		return Objects.hash(dataType);
 	}
 
 	@Override
 	public String toString() {
-		return TypeStringUtils.writeTypeInfo(type);
+		return dataType.toString();
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/ExpressionBuilder.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/ExpressionBuilder.java
index cc5333e..3f011cb 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/ExpressionBuilder.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/ExpressionBuilder.java
@@ -41,6 +41,7 @@ import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.PLUS
 import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.REINTERPRET_CAST;
 import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIMES;
 import static org.apache.flink.table.expressions.InternalFunctionDefinitions.THROW_EXCEPTION;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
  * Builder for {@link Expression}s.
@@ -130,7 +131,7 @@ public class ExpressionBuilder {
 	}
 
 	public static TypeLiteralExpression typeLiteral(TypeInformation<?> type) {
-		return new TypeLiteralExpression(type);
+		return new TypeLiteralExpression(fromLegacyInfoToDataType(type));
 	}
 
 	public static Expression concat(Expression input1, Expression input2) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
index bbd3d12..034ae67 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
@@ -48,6 +48,7 @@ import java.util.stream.Collectors;
 import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
 import static org.apache.flink.table.calcite.FlinkTypeFactory.toInternalType;
 import static org.apache.flink.table.type.TypeConverters.createInternalTypeFromTypeInfo;
+import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
 import static org.apache.flink.table.typeutils.TypeCheckUtils.isString;
 import static org.apache.flink.table.typeutils.TypeCheckUtils.isTemporal;
 import static org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval;
@@ -88,7 +89,8 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 			TypeLiteralExpression type = (TypeLiteralExpression) call.getChildren().get(1);
 			return relBuilder.getRexBuilder().makeAbstractCast(
 					typeFactory.createTypeFromInternalType(
-							createInternalTypeFromTypeInfo(type.getType()),
+							createInternalTypeFromTypeInfo(
+								fromDataTypeToLegacyInfo(type.getDataType())),
 							child.getType().isNullable()),
 					child);
 		} else if (call.getFunctionDefinition().equals(BuiltInFunctionDefinitions.REINTERPRET_CAST)) {
@@ -97,7 +99,8 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 			RexNode checkOverflow = call.getChildren().get(2).accept(this);
 			return relBuilder.getRexBuilder().makeReinterpretCast(
 					typeFactory.createTypeFromInternalType(
-							createInternalTypeFromTypeInfo(type.getType()),
+							createInternalTypeFromTypeInfo(
+								fromDataTypeToLegacyInfo(type.getDataType())),
 							child.getType().isNullable()),
 					child,
 					checkOverflow);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index f6b22f6..a4dc4c7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -18,15 +18,6 @@
 
 package org.apache.flink.table.sources
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.`type`.TypeConverters
-import org.apache.flink.table.api.{Types, ValidationException}
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.`type`.InternalType
-import org.apache.flink.table.`type`.InternalTypes.{BYTE, PROCTIME_BATCH_MARKER, PROCTIME_INDICATOR, PROCTIME_STREAM_MARKER, ROWTIME_BATCH_MARKER, ROWTIME_INDICATOR, ROWTIME_STREAM_MARKER}
-import org.apache.flink.table.expressions.{BuiltInFunctionDefinitions, CallExpression, PlannerResolvedFieldReference, ResolvedFieldReference, RexNodeConverter, TypeLiteralExpression}
-
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.RelOptCluster
 import org.apache.calcite.rel.RelNode
@@ -34,6 +25,14 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.LogicalValues
 import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.`type`.InternalTypes._
+import org.apache.flink.table.`type`.{InternalType, TypeConverters}
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 
 import scala.collection.JavaConversions._
 
@@ -277,7 +276,7 @@ object TableSourceUtil {
       // add cast to requested type and convert expression to RexNode
       val castExpression = new CallExpression(
         BuiltInFunctionDefinitions.CAST,
-        List(expression, new TypeLiteralExpression(resultType)))
+        List(expression, new TypeLiteralExpression(fromLegacyInfoToDataType(resultType))))
       val rexExpression = castExpression.accept(new RexNodeConverter(relBuilder))
       relBuilder.clear()
       rexExpression
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
index 1e1388e..d76334c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -18,15 +18,16 @@
 
 package org.apache.flink.table.sources.tsextractors
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.`type`.DecimalType
 import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.descriptors.Rowtime
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.`type`.DecimalType
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 import org.apache.flink.table.typeutils.DecimalTypeInfo
 
-import java.util
-
 import scala.collection.JavaConversions._
 
 /**
@@ -80,13 +81,17 @@ final class ExistingField(val field: String) extends TimestampExtractor {
         )
         new CallExpression(
           BuiltInFunctionDefinitions.CAST,
-          List(innerDiv, new TypeLiteralExpression(Types.SQL_TIMESTAMP)))
+          List(
+            innerDiv,
+            new TypeLiteralExpression(fromLegacyInfoToDataType(Types.SQL_TIMESTAMP))))
       case Types.SQL_TIMESTAMP =>
         fieldReferenceExpr
       case Types.STRING =>
         new CallExpression(
           BuiltInFunctionDefinitions.CAST,
-          List(fieldReferenceExpr, new TypeLiteralExpression(Types.SQL_TIMESTAMP)))
+          List(
+            fieldReferenceExpr,
+            new TypeLiteralExpression(fromLegacyInfoToDataType(Types.SQL_TIMESTAMP))))
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
index bebb683..7749b75 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
@@ -36,6 +36,7 @@ import java.util.stream.IntStream;
 
 import static java.util.Arrays.asList;
 import static org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 import static org.apache.flink.table.util.JavaScalaConversionUtil.toJava;
 
 /**
@@ -108,7 +109,9 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 			} else if (TypeCoercion.canSafelyCast(actualType, expectedType)) {
 				return new CallExpression(
 					BuiltInFunctionDefinitions.CAST,
-					asList(childExpression, typeLiteral(expectedType))
+					asList(
+						childExpression,
+						typeLiteral(fromLegacyInfoToDataType(expectedType)))
 				);
 			} else {
 				throw new ValidationException(String.format("Incompatible type of argument: %s Expected: %s",
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index a38d956..34d228f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -24,10 +24,12 @@ import java.sql.{Date, Time, Timestamp}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.api.{Over, Table, ValidationException}
 import org.apache.flink.table.expressions.ApiExpressionUtils._
-import org.apache.flink.table.expressions.BuiltInFunctionDefinitions.{WITH_COLUMNS, RANGE_TO, E => FDE, UUID => FDUUID, _}
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions.{RANGE_TO, WITH_COLUMNS, E => FDE, UUID => FDUUID, _}
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction, getResultTypeOfAggregateFunction}
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedAggregateFunction}
+import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 
 import _root_.scala.language.implicitConversions
 
@@ -262,16 +264,23 @@ trait ImplicitExpressionOperations {
   def collect: Expression = call(COLLECT, expr)
 
   /**
-    * Converts a value to a given type.
+    * Converts a value to a given data type.
     *
-    * e.g. "42".cast(Types.INT) leads to 42.
+    * e.g. "42".cast(DataTypes.INT()) leads to 42.
     *
     * @return casted expression
     */
-  def cast(toType: TypeInformation[_]): Expression =
+  def cast(toType: DataType): Expression =
     call(CAST, expr, typeLiteral(toType))
 
   /**
+    * @deprecated Use [[cast(DataType)]] instead.
+    */
+  @deprecated
+  def cast(toType: TypeInformation[_]): Expression =
+    call(CAST, expr, typeLiteral(fromLegacyInfoToDataType(toType)))
+
+  /**
     * Specifies a name for an expression i.e. a field.
     *
     * @param name name for one field
@@ -683,18 +692,20 @@ trait ImplicitExpressionOperations {
   /**
     * Parses a date string in the form "yyyy-MM-dd" to a SQL Date.
     */
-  def toDate: Expression = call(CAST, expr, typeLiteral(SqlTimeTypeInfo.DATE))
+  def toDate: Expression =
+    call(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE)))
 
   /**
     * Parses a time string in the form "HH:mm:ss" to a SQL Time.
     */
-  def toTime: Expression = call(CAST, expr, typeLiteral(SqlTimeTypeInfo.TIME))
+  def toTime: Expression =
+    call(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME)))
 
   /**
     * Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a SQL Timestamp.
     */
   def toTimestamp: Expression =
-    call(CAST, expr, typeLiteral(SqlTimeTypeInfo.TIMESTAMP))
+    call(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP)))
 
   /**
     * Extracts parts of a time point or time interval. Returns the part as a long value.
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 2eb8706..0134048 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.expressions
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.expressions.BuiltInFunctionDefinitions._
 import org.apache.flink.table.expressions.{E => PlannerE, UUID => PlannerUUID}
+import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
 
 import _root_.scala.collection.JavaConverters._
 
@@ -39,7 +40,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
         assert(children.size == 2)
         return Cast(
           children.head.accept(this),
-          children(1).asInstanceOf[TypeLiteralExpression].getType)
+          fromDataTypeToLegacyInfo(children(1).asInstanceOf[TypeLiteralExpression].getDataType))
 
       case WINDOW_START =>
         assert(children.size == 1)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
index 3850c3f..74c0089 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
@@ -23,6 +23,7 @@ import _root_.java.util.{List => JList}
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.api._
 import org.apache.flink.table.expressions.ApiExpressionUtils._
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 
 import _root_.scala.collection.JavaConversions._
 import _root_.scala.language.implicitConversions
@@ -260,7 +261,7 @@ object PlannerExpressionParserImpl extends JavaTokenParsers
   lazy val suffixCast: PackratParser[Expression] =
     composite ~ "." ~ CAST ~ "(" ~ dataType ~ ")" ^^ {
       case e ~ _ ~ _ ~ _ ~ dt ~ _ =>
-        call(BuiltInFunctionDefinitions.CAST, e, typeLiteral(dt))
+        call(BuiltInFunctionDefinitions.CAST, e, typeLiteral(fromLegacyInfoToDataType(dt)))
     }
 
   lazy val suffixTrim: PackratParser[Expression] =
@@ -331,17 +332,26 @@ object PlannerExpressionParserImpl extends JavaTokenParsers
 
   lazy val suffixToDate: PackratParser[Expression] =
     composite <~ "." ~ TO_DATE ~ opt("()") ^^ { e =>
-      call(BuiltInFunctionDefinitions.CAST, e, typeLiteral(SqlTimeTypeInfo.DATE))
+      call(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE)))
     }
 
   lazy val suffixToTimestamp: PackratParser[Expression] =
     composite <~ "." ~ TO_TIMESTAMP ~ opt("()") ^^ { e =>
-      call(BuiltInFunctionDefinitions.CAST, e, typeLiteral(SqlTimeTypeInfo.TIMESTAMP))
+      call(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP)))
     }
 
   lazy val suffixToTime: PackratParser[Expression] =
     composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e =>
-      call(BuiltInFunctionDefinitions.CAST, e, typeLiteral(SqlTimeTypeInfo.TIME))
+      call(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME)))
     }
 
   lazy val suffixTimeInterval : PackratParser[Expression] =
@@ -420,7 +430,10 @@ object PlannerExpressionParserImpl extends JavaTokenParsers
   lazy val prefixCast: PackratParser[Expression] =
     CAST ~ "(" ~ expression ~ "," ~ dataType ~ ")" ^^ {
       case _ ~ _ ~ e ~ _ ~ dt ~ _ =>
-        call(BuiltInFunctionDefinitions.CAST, e, typeLiteral(dt))
+        call(
+          BuiltInFunctionDefinitions.CAST,
+          e,
+          typeLiteral(fromLegacyInfoToDataType(dt)))
     }
 
   lazy val prefixIf: PackratParser[Expression] =
@@ -500,17 +513,26 @@ object PlannerExpressionParserImpl extends JavaTokenParsers
 
   lazy val prefixToDate: PackratParser[Expression] =
     TO_DATE ~ "(" ~> expression <~ ")" ^^ { e =>
-      call(BuiltInFunctionDefinitions.CAST, e, typeLiteral(SqlTimeTypeInfo.DATE))
+      call(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE)))
     }
 
   lazy val prefixToTimestamp: PackratParser[Expression] =
     TO_TIMESTAMP ~ "(" ~> expression <~ ")" ^^ { e =>
-      call(BuiltInFunctionDefinitions.CAST, e, typeLiteral(SqlTimeTypeInfo.TIMESTAMP))
+      call(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP)))
     }
 
   lazy val prefixToTime: PackratParser[Expression] =
     TO_TIME ~ "(" ~> expression <~ ")" ^^ { e =>
-      call(BuiltInFunctionDefinitions.CAST, e, typeLiteral(SqlTimeTypeInfo.TIME))
+      call(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME)))
     }
 
   lazy val prefixDistinct: PackratParser[Expression] =