You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/29 06:43:39 UTC
[flink] 05/08: [FLINK-13774][table] FieldComputer should return
ResolvedExpression
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e9223939ad1efb839228032e43f0ca9d437d59f9
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Aug 22 12:52:28 2019 +0200
[FLINK-13774][table] FieldComputer should return ResolvedExpression
---
.../flink/table/sources/tsextractors/ExistingField.java | 10 ++++++----
.../flink/table/planner/sources/TableSourceUtil.scala | 15 +++++++++------
2 files changed, 15 insertions(+), 10 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
index 0e8d73f..f490949 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
@@ -22,18 +22,19 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedFieldReference;
import org.apache.flink.table.types.DataType;
import java.sql.Timestamp;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.typeLiteral;
-import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -99,10 +100,11 @@ public final class ExistingField extends TimestampExtractor {
case TIMESTAMP_WITHOUT_TIME_ZONE:
return fieldReferenceExpr;
case VARCHAR:
- return unresolvedCall(
+ DataType outputType = TIMESTAMP(3).bridgedTo(Timestamp.class);
+ return new CallExpression(
CAST,
- fieldReferenceExpr,
- typeLiteral(TIMESTAMP(3).bridgedTo(Timestamp.class)));
+ Arrays.asList(fieldReferenceExpr, typeLiteral(outputType)),
+ outputType);
default:
throw new RuntimeException("Unsupport type: " + type);
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
index 489e3e2..d46849f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.planner.sources
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.table.api.{DataTypes, ValidationException}
-import org.apache.flink.table.expressions.ResolvedFieldReference
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall, valueLiteral}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, valueLiteral}
+import org.apache.flink.table.expressions.{CallExpression, ResolvedExpression, ResolvedFieldReference}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.expressions.RexNodeConverter
@@ -285,11 +285,14 @@ object TableSourceUtil {
// add cast to requested type and convert expression to RexNode
// blink runner treats numeric types as seconds in the cast of timestamp and numerical types.
// So we use REINTERPRET_CAST to keep the mills of numeric types.
- val castExpression = unresolvedCall(
+ val outputType = DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])
+ val castExpression = new CallExpression(
BuiltInFunctionDefinitions.REINTERPRET_CAST,
- expression,
- typeLiteral(DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])),
- valueLiteral(false))
+ Seq(
+ expression.asInstanceOf[ResolvedExpression],
+ typeLiteral(outputType),
+ valueLiteral(false)),
+ outputType)
val rexExpression = castExpression.accept(new RexNodeConverter(relBuilder))
relBuilder.clear()
rexExpression