You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/18 12:18:39 UTC

[flink] branch release-1.9 updated (41e56d9 -> c43dc95)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 41e56d9  [FLINK-12578][build] Add fallback unsafe MapR repository
     new 07e70a9  [FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP call in table planner
     new fa5c867  [FLINK-13287][table-planner] Support Reinterpret cast call in blink planner
     new 121e35e  [FLINK-13287][table-api] Port ExistingField to api-java and use new Expression in FieldComputer
     new c43dc95  [FLINK-13287][table-api] Port StreamRecordTimestamp to api-java

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/sources/tsextractors/ExistingField.java  | 136 +++++++++++++++++++++
 .../tsextractors/StreamRecordTimestamp.java        |  75 ++++++++++++
 .../table/expressions/ResolvedFieldReference.java  |  29 ++++-
 .../flink/table/expressions/RexNodeConverter.java  |   2 +
 .../expressions/ExestingFieldFieldReference.scala  |  26 ----
 .../expressions/PlannerExpressionConverter.scala   |  12 ++
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../expressions/{cast.scala => Reinterpret.scala}  |  15 +--
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../flink/table/sources/TableSourceUtil.scala      |  13 +-
 .../table/sources/tsextractors/ExistingField.scala | 111 -----------------
 .../ExtendedAggregateExtractProjectRule.java       |   4 +-
 .../expressions/PlannerExpressionConverter.scala   |   4 +
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../DataStreamGroupWindowAggregateBase.scala       |   4 +-
 .../flink/table/sources/TableSourceUtil.scala      |  29 +++--
 .../table/sources/tsextractors/ExistingField.scala |  88 -------------
 .../tsextractors/StreamRecordTimestamp.scala       |  67 ----------
 .../flink/table/descriptors/RowtimeTest.scala      |  19 ++-
 .../table/utils/TestFilterableTableSource.scala    |  22 ++--
 25 files changed, 331 insertions(+), 353 deletions(-)
 create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
 create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java
 delete mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala
 copy flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/{cast.scala => Reinterpret.scala} (76%)
 delete mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
 delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
 delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala


[flink] 03/04: [FLINK-13287][table-api] Port ExistingField to api-java and use new Expression in FieldComputer

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 121e35ec66dbaf40ca763e1a8551c480cea03215
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 17 20:23:37 2019 +0800

    [FLINK-13287][table-api] Port ExistingField to api-java and use new Expression in FieldComputer
---
 .../table/sources/tsextractors/ExistingField.java  | 136 +++++++++++++++++++++
 .../table/expressions/ResolvedFieldReference.java  |  29 ++++-
 .../expressions/ExestingFieldFieldReference.scala  |  26 ----
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../flink/table/sources/TableSourceUtil.scala      |  13 +-
 .../table/sources/tsextractors/ExistingField.scala | 111 -----------------
 .../ExtendedAggregateExtractProjectRule.java       |   4 +-
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../DataStreamGroupWindowAggregateBase.scala       |   4 +-
 .../flink/table/sources/TableSourceUtil.scala      |  29 +++--
 .../table/sources/tsextractors/ExistingField.scala |  88 -------------
 .../flink/table/descriptors/RowtimeTest.scala      |  19 ++-
 .../table/utils/TestFilterableTableSource.scala    |  22 ++--
 19 files changed, 230 insertions(+), 279 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
new file mode 100644
index 0000000..0e8d73f
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.types.DataType;
+
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.typeLiteral;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Converts an existing {@link Long}, {@link java.sql.Timestamp}, or
+ * timestamp formatted java.lang.String field (e.g., "2018-05-28 12:34:56.000") into
+ * a rowtime attribute.
+ */
+@PublicEvolving
+public final class ExistingField extends TimestampExtractor {
+
+	private static final long serialVersionUID = 1L;
+
+	private String field;
+
+	/**
+	 * @param field The field to convert into a rowtime attribute.
+	 */
+	public ExistingField(String field) {
+		this.field = checkNotNull(field);
+	}
+
+	@Override
+	public String[] getArgumentFields() {
+		return new String[] {field};
+	}
+
+	@Override
+	public void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes) {
+		DataType fieldType = fromLegacyInfoToDataType(argumentFieldTypes[0]);
+
+		switch (fieldType.getLogicalType().getTypeRoot()) {
+			case BIGINT:
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+			case VARCHAR:
+				break;
+			default:
+				throw new ValidationException(String.format(
+						"Field '%s' must be of type Long or Timestamp or String but is of type %s.",
+						field, fieldType));
+		}
+	}
+
+	/**
+	 * Returns an {@link Expression} that casts a {@link Long}, {@link Timestamp}, or
+	 * timestamp formatted {@link String} field (e.g., "2018-05-28 12:34:56.000")
+	 * into a rowtime attribute.
+	 */
+	@Override
+	public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
+		ResolvedFieldReference fieldAccess = fieldAccesses[0];
+		DataType type = fromLegacyInfoToDataType(fieldAccess.resultType());
+
+		FieldReferenceExpression fieldReferenceExpr = new FieldReferenceExpression(
+				fieldAccess.name(),
+				type,
+				0,
+				fieldAccess.fieldIndex());
+
+		switch (type.getLogicalType().getTypeRoot()) {
+			case BIGINT:
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+				return fieldReferenceExpr;
+			case VARCHAR:
+				return unresolvedCall(
+						CAST,
+						fieldReferenceExpr,
+						typeLiteral(TIMESTAMP(3).bridgedTo(Timestamp.class)));
+			default:
+				throw new RuntimeException("Unsupport type: " + type);
+		}
+	}
+
+	@Override
+	public Map<String, String> toProperties() {
+		Map<String, String> map = new HashMap<>();
+		map.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD);
+		map.put(Rowtime.ROWTIME_TIMESTAMPS_FROM, field);
+		return map;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		ExistingField that = (ExistingField) o;
+		return field.equals(that.field);
+	}
+
+	@Override
+	public int hashCode() {
+		return field.hashCode();
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java
index 515dcd0..8be0679 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java
@@ -21,18 +21,37 @@ package org.apache.flink.table.expressions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.sources.FieldComputer;
+import org.apache.flink.util.Preconditions;
 
 /**
  * A reference to a field in an input which has been resolved.
  *
  * <p>Note: This interface is added as a temporary solution. It is used to keep api compatible
- * for {@link FieldComputer}. In the long term, this interface can be removed when we unify
- * the {@link Expression} and {@code PlannerExpression}.
+ * for {@link FieldComputer}. In the long term, this interface can be removed.
  */
 @PublicEvolving
-public interface ResolvedFieldReference {
+public class ResolvedFieldReference {
 
-	TypeInformation<?> resultType();
+	private final String name;
+	private final TypeInformation<?> resultType;
+	private final int fieldIndex;
 
-	String name();
+	public ResolvedFieldReference(String name, TypeInformation<?> resultType, int fieldIndex) {
+		Preconditions.checkArgument(fieldIndex >= 0, "Index of field should be a positive number");
+		this.name = Preconditions.checkNotNull(name, "Field name must not be null.");
+		this.resultType = Preconditions.checkNotNull(resultType, "Field result type must not be null.");
+		this.fieldIndex = fieldIndex;
+	}
+
+	public TypeInformation<?> resultType() {
+		return resultType;
+	}
+
+	public String name() {
+		return name;
+	}
+
+	public int fieldIndex() {
+		return fieldIndex;
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala
deleted file mode 100644
index 0ad1f5e..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-case class ExestingFieldFieldReference(
-    name: String,
-    resultType: TypeInformation[_],
-    fieldIndex: Int) extends ResolvedFieldReference
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
index 7f7397f..fff6b59 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
@@ -36,20 +36,20 @@ object PlannerExpressionUtils {
   }
 
   private[flink] def isTimeAttribute(expr: PlannerExpression): Boolean = expr match {
-    case r: ResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
+    case r: PlannerResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
       true
     case _ => false
   }
 
   private[flink] def isRowtimeAttribute(expr: PlannerExpression): Boolean = expr match {
-    case r: ResolvedFieldReference
+    case r: PlannerResolvedFieldReference
       if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) =>
       true
     case _ => false
   }
 
   private[flink] def isProctimeAttribute(expr: PlannerExpression): Boolean = expr match {
-    case r: ResolvedFieldReference
+    case r: PlannerResolvedFieldReference
       if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) =>
       true
     case _ => false
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
index 406571f..46059b1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -84,9 +84,9 @@ case class OverCall(
 
     // check partitionBy expression keys are resolved field reference
     partitionBy.foreach {
-      case r: ResolvedFieldReference if r.resultType.isKeyType  =>
+      case r: PlannerResolvedFieldReference if r.resultType.isKeyType  =>
         ValidationSuccess
-      case r: ResolvedFieldReference =>
+      case r: PlannerResolvedFieldReference =>
         return ValidationFailure(s"Invalid PartitionBy expression: $r. " +
           s"Expression must return key type.")
       case r =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala
index 2b49cd2..64a2f63 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala
@@ -92,7 +92,7 @@ case class GetCompositeField(child: PlannerExpression, key: Any) extends UnaryEx
       } else {
         None
       }
-    case c: ResolvedFieldReference =>
+    case c: PlannerResolvedFieldReference =>
       val keySuffix = if (key.isInstanceOf[Int]) s"_$key" else key
       Some(s"${c.name}$$$keySuffix")
     case _ => None
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index bd27739..6d19c8f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -67,7 +67,7 @@ case class UnresolvedFieldReference(name: String) extends Attribute {
 
 case class PlannerResolvedFieldReference(
     name: String,
-    resultType: TypeInformation[_]) extends Attribute with ResolvedFieldReference {
+    resultType: TypeInformation[_]) extends Attribute {
 
   override def toString = s"'$name"
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index 59a8e03..7a750ee 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{DataTypes, ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall}
-import org.apache.flink.table.expressions.{ExestingFieldFieldReference, ResolvedFieldReference, RexNodeConverter}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall, valueLiteral}
+import org.apache.flink.table.expressions.{ResolvedFieldReference, RexNodeConverter}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.types.LogicalTypeDataTypeConverter
 import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
@@ -273,17 +273,20 @@ object TableSourceUtil {
           // push an empty values node with the physical schema on the relbuilder
           relBuilder.push(createSchemaRelNode(resolvedFields))
           // get extraction expression
-          resolvedFields.map(f => ExestingFieldFieldReference(f._1, f._3, f._2))
+          resolvedFields.map(f => new ResolvedFieldReference(f._1, f._3, f._2))
         } else {
           new Array[ResolvedFieldReference](0)
         }
 
       val expression = tsExtractor.getExpression(fieldAccesses)
       // add cast to requested type and convert expression to RexNode
+      // blink runner treats numeric types as seconds in the cast of timestamp and numerical types.
+      // So we use REINTERPRET_CAST to keep the mills of numeric types.
       val castExpression = unresolvedCall(
-        BuiltInFunctionDefinitions.CAST,
+        BuiltInFunctionDefinitions.REINTERPRET_CAST,
         expression,
-        typeLiteral(DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])))
+        typeLiteral(DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])),
+        valueLiteral(false))
       val rexExpression = castExpression.accept(new RexNodeConverter(relBuilder))
       relBuilder.clear()
       rexExpression
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
deleted file mode 100644
index 6b39883..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.tsextractors
-
-import org.apache.flink.api.common.typeinfo.{LocalTimeTypeInfo, TypeInformation, Types}
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.Rowtime
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall, valueLiteral}
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions
-import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-
-import java.util
-
-/**
-  * Converts an existing [[Long]], [[java.sql.Timestamp]], or
-  * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000") into
-  * a rowtime attribute.
-  *
-  * @param field The field to convert into a rowtime attribute.
-  */
-final class ExistingField(val field: String) extends TimestampExtractor {
-
-  override def getArgumentFields: Array[String] = Array(field)
-
-  @throws[ValidationException]
-  override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = {
-    val fieldType = argumentFieldTypes(0)
-
-    fieldType match {
-      case Types.LONG => // OK
-      case Types.SQL_TIMESTAMP => // OK
-      case Types.LOCAL_DATE_TIME => // OK
-      case Types.STRING => // OK
-      case _: TypeInformation[_] =>
-        throw new ValidationException(
-          s"Field '$field' must be of type Long or Timestamp or String but is of type $fieldType.")
-    }
-  }
-
-  /**
-    * Returns an [[Expression]] that casts a [[Long]], [[java.sql.Timestamp]], or
-    * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000")
-    * into a rowtime attribute.
-    */
-  override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
-    val fieldAccess: ExestingFieldFieldReference = fieldAccesses(0)
-        .asInstanceOf[ExestingFieldFieldReference]
-
-    val fieldReferenceExpr = new FieldReferenceExpression(
-      fieldAccess.name,
-      fromLegacyInfoToDataType(fieldAccess.resultType),
-      0,
-      fieldAccess.fieldIndex)
-
-    fieldAccess.resultType match {
-      case Types.LONG =>
-        // access LONG field
-        val innerDiv = unresolvedCall(
-          BuiltInFunctionDefinitions.DIVIDE,
-          fieldReferenceExpr,
-          valueLiteral(new java.math.BigDecimal(1000)))
-
-        unresolvedCall(
-          BuiltInFunctionDefinitions.CAST,
-          innerDiv,
-          typeLiteral(fromLegacyInfoToDataType(Types.SQL_TIMESTAMP)))
-
-      case Types.SQL_TIMESTAMP | LocalTimeTypeInfo.LOCAL_DATE_TIME =>
-        fieldReferenceExpr
-
-      case Types.STRING =>
-        unresolvedCall(
-          BuiltInFunctionDefinitions.CAST,
-          fieldReferenceExpr,
-          typeLiteral(fromLegacyInfoToDataType(Types.SQL_TIMESTAMP)))
-    }
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: ExistingField => field == that.field
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    field.hashCode
-  }
-
-  override def toProperties: util.Map[String, String] = {
-    val javaMap = new util.HashMap[String, String]()
-    javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD)
-    javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_FROM, field)
-    javaMap
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java
index d863297..80c297e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.rules.logical;
 
-import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.expressions.PlannerResolvedFieldReference;
 import org.apache.flink.table.plan.logical.LogicalWindow;
 import org.apache.flink.table.plan.logical.rel.LogicalTableAggregate;
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate;
@@ -207,7 +207,7 @@ public class ExtendedAggregateExtractProjectRule extends AggregateExtractProject
 	}
 
 	private int getWindowTimeFieldIndex(LogicalWindow logicalWindow, RelNode input) {
-		ResolvedFieldReference timeAttribute = (ResolvedFieldReference) logicalWindow.timeAttribute();
+		PlannerResolvedFieldReference timeAttribute = (PlannerResolvedFieldReference) logicalWindow.timeAttribute();
 		return input.getRowType().getFieldNames().indexOf(timeAttribute.name());
 	}
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
index 7f7397f..fff6b59 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
@@ -36,20 +36,20 @@ object PlannerExpressionUtils {
   }
 
   private[flink] def isTimeAttribute(expr: PlannerExpression): Boolean = expr match {
-    case r: ResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
+    case r: PlannerResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
       true
     case _ => false
   }
 
   private[flink] def isRowtimeAttribute(expr: PlannerExpression): Boolean = expr match {
-    case r: ResolvedFieldReference
+    case r: PlannerResolvedFieldReference
       if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) =>
       true
     case _ => false
   }
 
   private[flink] def isProctimeAttribute(expr: PlannerExpression): Boolean = expr match {
-    case r: ResolvedFieldReference
+    case r: PlannerResolvedFieldReference
       if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) =>
       true
     case _ => false
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
index 508a7f2..f2da6fc 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -172,9 +172,9 @@ case class OverCall(
 
     // check partitionBy expression keys are resolved field reference
     partitionBy.foreach {
-      case r: ResolvedFieldReference if r.resultType.isKeyType  =>
+      case r: PlannerResolvedFieldReference if r.resultType.isKeyType  =>
         ValidationSuccess
-      case r: ResolvedFieldReference =>
+      case r: PlannerResolvedFieldReference =>
         return ValidationFailure(s"Invalid PartitionBy expression: $r. " +
           s"Expression must return key type.")
       case r =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala
index 1f858a1..3e2c374 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala
@@ -100,7 +100,7 @@ case class GetCompositeField(child: PlannerExpression, key: Any) extends UnaryEx
       } else {
         None
       }
-    case c: ResolvedFieldReference =>
+    case c: PlannerResolvedFieldReference =>
       val keySuffix = if (key.isInstanceOf[Int]) s"_$key" else key
       Some(s"${c.name}$$$keySuffix")
     case _ => None
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index 56d4e72..ced9b32 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -71,7 +71,7 @@ case class UnresolvedFieldReference(name: String) extends Attribute {
 
 case class PlannerResolvedFieldReference(
     name: String,
-    resultType: TypeInformation[_]) extends Attribute with ResolvedFieldReference {
+    resultType: TypeInformation[_]) extends Attribute {
 
   override def toString = s"'$name"
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala
index 2b1c1fb..e381464 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWin
 import org.apache.flink.table.api.{StreamQueryConfig, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.expressions.PlannerExpressionUtils._
-import org.apache.flink.table.expressions.ResolvedFieldReference
+import org.apache.flink.table.expressions.PlannerResolvedFieldReference
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregateBase._
@@ -135,7 +135,7 @@ abstract class DataStreamGroupWindowAggregateBase(
 
     val timestampedInput = if (isRowtimeAttribute(window.timeAttribute)) {
       // copy the window rowtime attribute into the StreamRecord timestamp field
-      val timeAttribute = window.timeAttribute.asInstanceOf[ResolvedFieldReference].name
+      val timeAttribute = window.timeAttribute.asInstanceOf[PlannerResolvedFieldReference].name
       val timeIdx = inputSchema.fieldNames.indexOf(timeAttribute)
       if (timeIdx < 0) {
         throw new TableException("Time attribute could not be found. This is a bug.")
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index ccc4733..f91f2fa 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.sources
 
 import java.sql.Timestamp
-
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.RelOptCluster
 import org.apache.calcite.rel.RelNode
@@ -29,10 +28,12 @@ import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.{TableException, Types, ValidationException}
+import org.apache.flink.table.api.{DataTypes, TableException, Types, ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Cast, PlannerExpression, PlannerResolvedFieldReference, ResolvedFieldReference}
-import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall}
+import org.apache.flink.table.expressions.{PlannerExpressionConverter, ResolvedFieldReference}
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST
+import org.apache.flink.table.types.utils.TypeConversions.{fromDataTypeToLegacyInfo, fromLegacyInfoToDataType}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 import scala.collection.JavaConverters._
@@ -266,17 +267,23 @@ object TableSourceUtil {
         // push an empty values node with the physical schema on the relbuilder
         relBuilder.push(createSchemaRelNode(resolvedFields))
         // get extraction expression
-        resolvedFields.map(f => PlannerResolvedFieldReference(f._1, f._3))
+        resolvedFields.map(f => new ResolvedFieldReference(f._1, f._3, f._2))
       } else {
-        new Array[PlannerResolvedFieldReference](0)
+        new Array[ResolvedFieldReference](0)
       }
 
-      val expression = tsExtractor
-        .getExpression(fieldAccesses.map(_.asInstanceOf[ResolvedFieldReference]))
+      val expression = tsExtractor.getExpression(fieldAccesses)
       // add cast to requested type and convert expression to RexNode
-      // TODO we cast to planner expressions as a temporary solution to keep the old interfaces
-      val rexExpression = Cast(expression.asInstanceOf[PlannerExpression], resultType)
-        .toRexNode(relBuilder)
+      // If resultType is TimeIndicatorTypeInfo, its internal format is long, but cast
+      // from Timestamp is java.sql.Timestamp. So we need cast to long first.
+      val castExpression = unresolvedCall(CAST,
+        unresolvedCall(CAST, expression, typeLiteral(DataTypes.BIGINT())),
+        typeLiteral(fromLegacyInfoToDataType(resultType)))
+
+      // TODO we convert to planner expressions as a temporary solution
+      val rexExpression = castExpression
+          .accept(PlannerExpressionConverter.INSTANCE)
+          .toRexNode(relBuilder)
       relBuilder.clear()
       rexExpression
     }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
deleted file mode 100644
index c9f4477..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.tsextractors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation, Types}
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.Rowtime
-import org.apache.flink.table.expressions._
-
-/**
-  * Converts an existing [[Long]], [[java.sql.Timestamp]], or
-  * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000") into
-  * a rowtime attribute.
-  *
-  * @param field The field to convert into a rowtime attribute.
-  */
-final class ExistingField(val field: String) extends TimestampExtractor {
-
-  override def getArgumentFields: Array[String] = Array(field)
-
-  @throws[ValidationException]
-  override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = {
-    val fieldType = argumentFieldTypes(0)
-
-    fieldType match {
-      case Types.LONG => // OK
-      case Types.SQL_TIMESTAMP => // OK
-      case Types.STRING => // OK
-      case _: TypeInformation[_] =>
-        throw new ValidationException(
-          s"Field '$field' must be of type Long or Timestamp or String but is of type $fieldType.")
-    }
-  }
-
-  /**
-    * Returns an [[Expression]] that casts a [[Long]], [[java.sql.Timestamp]], or
-    * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000")
-    * into a rowtime attribute.
-    */
-  override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): PlannerExpression = {
-    val fieldAccess: PlannerExpression = fieldAccesses(0).asInstanceOf[PlannerExpression]
-
-    fieldAccess.resultType match {
-      case Types.LONG =>
-        // access LONG field
-        fieldAccess
-      case Types.SQL_TIMESTAMP =>
-        // cast timestamp to long
-        Cast(fieldAccess, Types.LONG)
-      case Types.STRING =>
-        Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
-    }
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: ExistingField => field == that.field
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    field.hashCode
-  }
-
-  override def toProperties: util.Map[String, String] = {
-    val javaMap = new util.HashMap[String, String]()
-    javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD)
-    javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_FROM, field)
-    javaMap
-  }
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
index 3424088..0f51c76 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
@@ -19,15 +19,18 @@
 package org.apache.flink.table.descriptors
 
 import java.util
-
 import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{DataTypes, ValidationException}
 import org.apache.flink.table.descriptors.RowtimeTest.{CustomAssigner, CustomExtractor}
 import org.apache.flink.table.expressions._
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.sources.tsextractors.TimestampExtractor
 import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
+import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.types.Row
+
 import org.junit.Test
 
 import scala.collection.JavaConverters._
@@ -130,9 +133,17 @@ object RowtimeTest {
     }
 
     override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
-      val fieldAccess: PlannerExpression = fieldAccesses(0).asInstanceOf[PlannerExpression]
+      val fieldAccess = fieldAccesses(0)
       require(fieldAccess.resultType == Types.SQL_TIMESTAMP)
-      Cast(fieldAccess, Types.LONG)
+      val fieldReferenceExpr = new FieldReferenceExpression(
+        fieldAccess.name,
+        TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType),
+        0,
+        fieldAccess.fieldIndex)
+      ApiExpressionUtils.unresolvedCall(
+        BuiltInFunctionDefinitions.CAST,
+        fieldReferenceExpr,
+        ApiExpressionUtils.typeLiteral(DataTypes.BIGINT()))
     }
 
     override def equals(other: Any): Boolean = other match {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
index 1372da5..4f767f6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
@@ -161,11 +161,11 @@ class TestFilterableTableSource(
 
   private def shouldPushDown(expr: BinaryComparison): Boolean = {
     (expr.left, expr.right) match {
-      case (f: ResolvedFieldReference, v: Literal) =>
+      case (f: PlannerResolvedFieldReference, v: Literal) =>
         filterableFields.contains(f.name)
-      case (v: Literal, f: ResolvedFieldReference) =>
+      case (v: Literal, f: PlannerResolvedFieldReference) =>
         filterableFields.contains(f.name)
-      case (f1: ResolvedFieldReference, f2: ResolvedFieldReference) =>
+      case (f1: PlannerResolvedFieldReference, f2: PlannerResolvedFieldReference) =>
         filterableFields.contains(f1.name) && filterableFields.contains(f2.name)
       case (_, _) => false
     }
@@ -184,15 +184,15 @@ class TestFilterableTableSource(
     expr match {
       case _: GreaterThan =>
         lhsValue.compareTo(rhsValue) > 0
-      case LessThan(l: ResolvedFieldReference, r: Literal) =>
+      case LessThan(l: PlannerResolvedFieldReference, r: Literal) =>
         lhsValue.compareTo(rhsValue) < 0
-      case GreaterThanOrEqual(l: ResolvedFieldReference, r: Literal) =>
+      case GreaterThanOrEqual(l: PlannerResolvedFieldReference, r: Literal) =>
         lhsValue.compareTo(rhsValue) >= 0
-      case LessThanOrEqual(l: ResolvedFieldReference, r: Literal) =>
+      case LessThanOrEqual(l: PlannerResolvedFieldReference, r: Literal) =>
         lhsValue.compareTo(rhsValue) <= 0
-      case EqualTo(l: ResolvedFieldReference, r: Literal) =>
+      case EqualTo(l: PlannerResolvedFieldReference, r: Literal) =>
         lhsValue.compareTo(rhsValue) == 0
-      case NotEqualTo(l: ResolvedFieldReference, r: Literal) =>
+      case NotEqualTo(l: PlannerResolvedFieldReference, r: Literal) =>
         lhsValue.compareTo(rhsValue) != 0
     }
   }
@@ -201,12 +201,12 @@ class TestFilterableTableSource(
     : (Comparable[Any], Comparable[Any]) = {
 
     (expr.left, expr.right) match {
-      case (l: ResolvedFieldReference, r: Literal) =>
+      case (l: PlannerResolvedFieldReference, r: Literal) =>
         val idx = rowTypeInfo.getFieldIndex(l.name)
         val lv = row.getField(idx).asInstanceOf[Comparable[Any]]
         val rv = r.value.asInstanceOf[Comparable[Any]]
         (lv, rv)
-      case (l: Literal, r: ResolvedFieldReference) =>
+      case (l: Literal, r: PlannerResolvedFieldReference) =>
         val idx = rowTypeInfo.getFieldIndex(r.name)
         val lv = l.value.asInstanceOf[Comparable[Any]]
         val rv = row.getField(idx).asInstanceOf[Comparable[Any]]
@@ -215,7 +215,7 @@ class TestFilterableTableSource(
         val lv = l.value.asInstanceOf[Comparable[Any]]
         val rv = r.value.asInstanceOf[Comparable[Any]]
         (lv, rv)
-      case (l: ResolvedFieldReference, r: ResolvedFieldReference) =>
+      case (l: PlannerResolvedFieldReference, r: PlannerResolvedFieldReference) =>
         val lidx = rowTypeInfo.getFieldIndex(l.name)
         val ridx = rowTypeInfo.getFieldIndex(r.name)
         val lv = row.getField(lidx).asInstanceOf[Comparable[Any]]


[flink] 04/04: [FLINK-13287][table-api] Port StreamRecordTimestamp to api-java

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c43dc951bf52e2f76a9c17021cfdf1ca8bc54781
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 17 20:25:04 2019 +0800

    [FLINK-13287][table-api] Port StreamRecordTimestamp to api-java
    
    This closes #9129.
---
 .../tsextractors/StreamRecordTimestamp.java        | 75 ++++++++++++++++++++++
 .../tsextractors/StreamRecordTimestamp.scala       | 67 -------------------
 2 files changed, 75 insertions(+), 67 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java
new file mode 100644
index 0000000..fdb4384
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP;
+
+/**
+ * Extracts the timestamp of a StreamRecord into a rowtime attribute.
+ *
+ * <p>Note: This extractor only works for StreamTableSources.
+ */
+@PublicEvolving
+public final class StreamRecordTimestamp extends TimestampExtractor {
+
+	private static final long serialVersionUID = 1L;
+
+	public static final StreamRecordTimestamp INSTANCE = new StreamRecordTimestamp();
+
+	@Override
+	public String[] getArgumentFields() {
+		return new String[0];
+	}
+
+	@Override
+	public void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes) {
+	}
+
+	@Override
+	public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
+		return ApiExpressionUtils.unresolvedCall(STREAM_RECORD_TIMESTAMP);
+	}
+
+	@Override
+	public Map<String, String> toProperties() {
+		Map<String, String> map = new HashMap<>();
+		map.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE);
+		return map;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		return this == o || o != null && getClass() == o.getClass();
+	}
+
+	@Override
+	public int hashCode() {
+		return this.getClass().hashCode();
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
deleted file mode 100644
index 087b1a6..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.tsextractors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.Rowtime
-import org.apache.flink.table.expressions.{Expression, PlannerExpression, ResolvedFieldReference}
-
-/**
-  * Extracts the timestamp of a StreamRecord into a rowtime attribute.
-  *
-  * Note: This extractor only works for StreamTableSources.
-  */
-final class StreamRecordTimestamp extends TimestampExtractor {
-
-  /** No argument fields required. */
-  override def getArgumentFields: Array[String] = Array()
-
-  /** No validation required. */
-  @throws[ValidationException]
-  override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = { }
-
-  /**
-    * Returns an [[Expression]] that extracts the timestamp of a StreamRecord.
-    */
-  override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): PlannerExpression = {
-    org.apache.flink.table.expressions.StreamRecordTimestamp()
-  }
-
-  override def equals(obj: Any): Boolean = obj match {
-    case _: StreamRecordTimestamp => true
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    classOf[StreamRecordTimestamp].hashCode()
-  }
-
-  override def toProperties: util.Map[String, String] = {
-    val javaMap = new util.HashMap[String, String]()
-    javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE)
-    javaMap
-  }
-}
-
-object StreamRecordTimestamp {
-  val INSTANCE: StreamRecordTimestamp = new StreamRecordTimestamp
-}


[flink] 02/04: [FLINK-13287][table-planner] Support Reinterpret cast call in blink planner

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa5c8674a6a2bcdbcc7ed898e98a89e3b2f92c50
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 17 20:20:22 2019 +0800

    [FLINK-13287][table-planner] Support Reinterpret cast call in blink planner
---
 .../expressions/PlannerExpressionConverter.scala   |  8 ++++
 .../flink/table/expressions/Reinterpret.scala      | 45 ++++++++++++++++++++++
 2 files changed, 53 insertions(+)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 208cad9..8b5dada 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -56,6 +56,14 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
           fromDataTypeToLegacyInfo(
             children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
 
+      case REINTERPRET_CAST =>
+        assert(children.size == 3)
+        Reinterpret(
+          children.head.accept(this),
+          fromDataTypeToLegacyInfo(
+            children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType),
+          getValue[Boolean](children(2).accept(this)))
+
       case WINDOW_START =>
         assert(children.size == 1)
         val windowReference = translateWindowReference(children.head)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/Reinterpret.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/Reinterpret.scala
new file mode 100644
index 0000000..530fd3c
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/Reinterpret.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.validate._
+
+case class Reinterpret(child: PlannerExpression, resultType: TypeInformation[_],
+                       checkOverflow: Boolean) extends UnaryExpression {
+
+  override def toString = s"$child.reinterpret($resultType)"
+
+  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+    val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
+    copy(child, resultType).asInstanceOf[this.type]
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (TypeCoercion.canReinterpret(
+      fromTypeInfoToLogicalType(child.resultType), fromTypeInfoToLogicalType(resultType))) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Unsupported reinterpret from ${child.resultType} to $resultType")
+    }
+  }
+}
+


[flink] 01/04: [FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP call in table planner

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07e70a927ee36cd5eda85eece6b221086fe335ff
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jul 17 20:18:56 2019 +0800

    [FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP call in table planner
---
 .../java/org/apache/flink/table/expressions/RexNodeConverter.java     | 2 ++
 .../apache/flink/table/expressions/PlannerExpressionConverter.scala   | 4 ++++
 .../apache/flink/table/expressions/PlannerExpressionConverter.scala   | 4 ++++
 3 files changed, 10 insertions(+)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
index 2c6ef4d..5528571 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
@@ -289,6 +289,8 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 		conversionsOfBuiltInFunc
 				.put(BuiltInFunctionDefinitions.SHA512, exprs -> convert(FlinkSqlOperatorTable.SHA512, exprs));
 		conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA1, exprs -> convert(FlinkSqlOperatorTable.SHA1, exprs));
+		conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP,
+				exprs -> convert(FlinkSqlOperatorTable.STREAMRECORD_TIMESTAMP, exprs));
 	}
 
 	@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index d52d6e6a..208cad9 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -683,6 +683,10 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
             assert(args.isEmpty)
             CurrentRow()
 
+          case STREAM_RECORD_TIMESTAMP =>
+            assert(args.isEmpty)
+            StreamRecordTimestamp()
+
           case _ =>
             throw new TableException(s"Unsupported function definition: $fd")
         }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 999fa56..5684594 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -682,6 +682,10 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
             assert(args.isEmpty)
             CurrentRow()
 
+          case STREAM_RECORD_TIMESTAMP =>
+            assert(args.isEmpty)
+            StreamRecordTimestamp()
+
           case _ =>
             throw new TableException(s"Unsupported function definition: $fd")
         }