You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/29 06:43:39 UTC

[flink] 05/08: [FLINK-13774][table] FieldComputer should return ResolvedExpression

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e9223939ad1efb839228032e43f0ca9d437d59f9
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Aug 22 12:52:28 2019 +0200

    [FLINK-13774][table] FieldComputer should return ResolvedExpression
---
 .../flink/table/sources/tsextractors/ExistingField.java   | 10 ++++++----
 .../flink/table/planner/sources/TableSourceUtil.scala     | 15 +++++++++------
 2 files changed, 15 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
index 0e8d73f..f490949 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
@@ -22,18 +22,19 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedFieldReference;
 import org.apache.flink.table.types.DataType;
 
 import java.sql.Timestamp;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.typeLiteral;
-import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
 import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -99,10 +100,11 @@ public final class ExistingField extends TimestampExtractor {
 			case TIMESTAMP_WITHOUT_TIME_ZONE:
 				return fieldReferenceExpr;
 			case VARCHAR:
-				return unresolvedCall(
+				DataType outputType = TIMESTAMP(3).bridgedTo(Timestamp.class);
+				return new CallExpression(
 						CAST,
-						fieldReferenceExpr,
-						typeLiteral(TIMESTAMP(3).bridgedTo(Timestamp.class)));
+						Arrays.asList(fieldReferenceExpr, typeLiteral(outputType)),
+						outputType);
 			default:
 				throw new RuntimeException("Unsupport type: " + type);
 		}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
index 489e3e2..d46849f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.planner.sources
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{DataTypes, ValidationException}
-import org.apache.flink.table.expressions.ResolvedFieldReference
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall, valueLiteral}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, valueLiteral}
+import org.apache.flink.table.expressions.{CallExpression, ResolvedExpression, ResolvedFieldReference}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.expressions.RexNodeConverter
@@ -285,11 +285,14 @@ object TableSourceUtil {
       // add cast to requested type and convert expression to RexNode
       // blink runner treats numeric types as seconds in the cast of timestamp and numerical types.
       // So we use REINTERPRET_CAST to keep the mills of numeric types.
-      val castExpression = unresolvedCall(
+      val outputType = DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])
+      val castExpression = new CallExpression(
         BuiltInFunctionDefinitions.REINTERPRET_CAST,
-        expression,
-        typeLiteral(DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])),
-        valueLiteral(false))
+        Seq(
+          expression.asInstanceOf[ResolvedExpression],
+          typeLiteral(outputType),
+          valueLiteral(false)),
+        outputType)
       val rexExpression = castExpression.accept(new RexNodeConverter(relBuilder))
       relBuilder.clear()
       rexExpression