You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/03 06:06:50 UTC

[flink] 02/02: [FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 886b01d8b21ed378cbffa6d01217f960eb308770
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Jul 2 14:27:07 2019 +0800

    [FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner
    
    This closes #8942
---
 .../expressions/PlannerTypeInferenceUtilImpl.java  | 142 ++++
 .../table/functions/sql/FlinkSqlOperatorTable.java |   1 +
 .../table/api/ExpressionParserException.scala      |  60 ++
 .../flink/table/calcite/FlinkRelBuilder.scala      |   4 +-
 .../flink/table/calcite/FlinkTypeFactory.scala     |  21 +-
 .../flink/table/expressions/ExpressionBridge.scala |  40 +
 .../flink/table/expressions/InputTypeSpec.scala    |  69 ++
 .../table/expressions/PlannerExpression.scala      |  98 +++
 .../expressions/PlannerExpressionConverter.scala   | 836 +++++++++++++++++++++
 .../expressions/PlannerExpressionParserImpl.scala  | 726 ++++++++++++++++++
 .../table/expressions/PlannerExpressionUtils.scala |  68 ++
 .../flink/table/expressions/aggregations.scala     | 439 +++++++++++
 .../flink/table/expressions/arithmetic.scala       | 165 ++++
 .../org/apache/flink/table/expressions/call.scala  | 326 ++++++++
 .../org/apache/flink/table/expressions/cast.scala  |  59 ++
 .../flink/table/expressions/collection.scala       | 235 ++++++
 .../flink/table/expressions/comparison.scala       | 242 ++++++
 .../apache/flink/table/expressions/composite.scala | 108 +++
 .../flink/table/expressions/fieldExpression.scala  | 253 +++++++
 .../flink/table/expressions/hashExpressions.scala  | 124 +++
 .../apache/flink/table/expressions/literals.scala  | 139 ++++
 .../org/apache/flink/table/expressions/logic.scala | 109 +++
 .../flink/table/expressions/mathExpressions.scala  | 532 +++++++++++++
 .../apache/flink/table/expressions/ordering.scala  |  54 ++
 .../flink/table/expressions/overOffsets.scala      |  54 ++
 .../apache/flink/table/expressions/package.scala   |  29 +
 .../table/expressions/stringExpressions.scala      | 585 ++++++++++++++
 .../apache/flink/table/expressions/subquery.scala  |  95 +++
 .../apache/flink/table/expressions/symbols.scala   | 134 ++++
 .../org/apache/flink/table/expressions/time.scala  | 369 +++++++++
 .../flink/table/expressions/windowProperties.scala |  67 ++
 .../functions/utils/UserDefinedFunctionUtils.scala |   8 +
 .../org/apache/flink/table/plan/TreeNode.scala     | 115 +++
 .../flink/table/plan/util/RexNodeExtractor.scala   |   9 +-
 .../flink/table/typeutils/TypeInfoCheckUtils.scala | 277 +++++++
 .../flink/table/validate/ValidationResult.scala    |  53 ++
 .../flink/table/expressions/KeywordParseTest.scala |  62 ++
 .../table/plan/util/RexNodeExtractorTest.scala     | 195 ++---
 38 files changed, 6801 insertions(+), 101 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
new file mode 100644
index 0000000..816e783
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeInferenceUtil;
+import org.apache.flink.table.typeutils.TypeCoercion;
+import org.apache.flink.table.validate.ValidationFailure;
+import org.apache.flink.table.validate.ValidationResult;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+import static org.apache.flink.table.util.JavaScalaConversionUtil.toJava;
+
+/**
+ * Implementation of {@link PlannerTypeInferenceUtil}.
+ */
+@Internal
+public final class PlannerTypeInferenceUtilImpl implements PlannerTypeInferenceUtil {
+
+	private static final PlannerExpressionConverter CONVERTER = PlannerExpressionConverter.INSTANCE();
+
+	@Override
+	public TypeInferenceUtil.Result runTypeInference(
+			UnresolvedCallExpression unresolvedCall,
+			List<ResolvedExpression> resolvedArgs) {
+		final PlannerExpression plannerCall = unresolvedCall.accept(CONVERTER);
+
+		if (plannerCall instanceof InputTypeSpec) {
+			return resolveWithCastedAssignment(
+				unresolvedCall,
+				resolvedArgs,
+				toJava(((InputTypeSpec) plannerCall).expectedTypes()),
+				plannerCall.resultType());
+		} else {
+			validateArguments(plannerCall);
+
+			final List<DataType> expectedArgumentTypes = resolvedArgs.stream()
+				.map(ResolvedExpression::getOutputDataType)
+				.collect(Collectors.toList());
+
+			return new TypeInferenceUtil.Result(
+				expectedArgumentTypes,
+				null,
+				fromLegacyInfoToDataType(plannerCall.resultType()));
+		}
+	}
+
+	private TypeInferenceUtil.Result resolveWithCastedAssignment(
+			UnresolvedCallExpression unresolvedCall,
+			List<ResolvedExpression> args,
+			List<TypeInformation<?>> expectedTypes,
+			TypeInformation<?> resultType) {
+
+		final List<PlannerExpression> plannerArgs = unresolvedCall.getChildren()
+			.stream()
+			.map(e -> e.accept(CONVERTER))
+			.collect(Collectors.toList());
+
+		final List<DataType> castedArgs = IntStream.range(0, plannerArgs.size())
+			.mapToObj(idx -> castIfNeeded(
+				args.get(idx),
+				plannerArgs.get(idx),
+				expectedTypes.get(idx)))
+			.collect(Collectors.toList());
+
+		return new TypeInferenceUtil.Result(
+			castedArgs,
+			null,
+			fromLegacyInfoToDataType(resultType));
+	}
+
+	private void validateArguments(PlannerExpression plannerCall) {
+		if (!plannerCall.valid()) {
+			throw new ValidationException(
+				getValidationErrorMessage(plannerCall)
+					.orElse("Unexpected behavior, validation failed but can't get error messages!"));
+		}
+	}
+
+	/**
+	 * Return the validation error message of this {@link PlannerExpression} or return the
+	 * validation error message of it's children if it passes the validation. Return empty if
+	 * all validation succeeded.
+	 */
+	private Optional<String> getValidationErrorMessage(PlannerExpression plannerCall) {
+		ValidationResult validationResult = plannerCall.validateInput();
+		if (validationResult instanceof ValidationFailure) {
+			return Optional.of(((ValidationFailure) validationResult).message());
+		} else {
+			for (Expression plannerExpression: plannerCall.getChildren()) {
+				Optional<String> errorMessage = getValidationErrorMessage((PlannerExpression) plannerExpression);
+				if (errorMessage.isPresent()) {
+					return errorMessage;
+				}
+			}
+		}
+		return Optional.empty();
+	}
+
+	private DataType castIfNeeded(
+			ResolvedExpression child,
+			PlannerExpression plannerChild,
+			TypeInformation<?> expectedType) {
+		TypeInformation<?> actualType = plannerChild.resultType();
+		if (actualType.equals(expectedType)) {
+			return child.getOutputDataType();
+		} else if (TypeCoercion.canSafelyCast(
+				fromTypeInfoToLogicalType(actualType), fromTypeInfoToLogicalType(expectedType))) {
+			return fromLegacyInfoToDataType(expectedType);
+		} else {
+			throw new ValidationException(String.format("Incompatible type of argument: %s Expected: %s",
+				child,
+				expectedType));
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
index f6c8d6a..38ddf18 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
@@ -1098,6 +1098,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction RAND_INTEGER = SqlStdOperatorTable.RAND_INTEGER;
 	public static final SqlFunction TIMESTAMP_ADD = SqlStdOperatorTable.TIMESTAMP_ADD;
 	public static final SqlFunction TIMESTAMP_DIFF = SqlStdOperatorTable.TIMESTAMP_DIFF;
+	public static final SqlFunction TRUNCATE = SqlStdOperatorTable.TRUNCATE;
 
 	// MATCH_RECOGNIZE
 	public static final SqlFunction FIRST = SqlStdOperatorTable.FIRST;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/ExpressionParserException.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/ExpressionParserException.scala
new file mode 100644
index 0000000..91283bf
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/ExpressionParserException.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+/**
+  * Exception for all errors occurring during expression parsing.
+  */
+case class ExpressionParserException(msg: String) extends RuntimeException(msg)
+
+/**
+  * Exception for unwanted method calling on unresolved expression.
+  */
+case class UnresolvedException(msg: String) extends RuntimeException(msg)
+
+/**
+  * Exception for adding an already existent table
+  *
+  * @param catalog    catalog name
+  * @param table      table name
+  * @param cause      the cause
+  */
+case class TableAlreadyExistException(
+    catalog: String,
+    table: String,
+    cause: Throwable)
+    extends RuntimeException(s"Table $catalog.$table already exists.", cause) {
+
+  def this(catalog: String, table: String) = this(catalog, table, null)
+
+}
+
+/**
+  * Exception for adding an already existent catalog
+  *
+  * @param catalog catalog name
+  * @param cause the cause
+  */
+case class CatalogAlreadyExistException(
+    catalog: String,
+    cause: Throwable)
+    extends RuntimeException(s"Catalog $catalog already exists.", cause) {
+
+  def this(catalog: String) = this(catalog, null)
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
index 8b5cf8a..b560fa5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.calcite
 
 import org.apache.flink.table.calcite.FlinkRelFactories.{ExpandFactory, RankFactory, SinkFactory}
-import org.apache.flink.table.expressions.PlannerWindowProperty
+import org.apache.flink.table.expressions.{PlannerWindowProperty, WindowProperty}
 import org.apache.flink.table.operations.QueryOperation
 import org.apache.flink.table.plan.QueryOperationConverter
 import org.apache.flink.table.runtime.rank.{RankRange, RankType}
@@ -113,6 +113,8 @@ object FlinkRelBuilder {
     */
   case class PlannerNamedWindowProperty(name: String, property: PlannerWindowProperty)
 
+  case class NamedWindowProperty(name: String, property: WindowProperty)
+
   def proto(context: Context): RelBuilderFactory = new RelBuilderFactory() {
     def create(cluster: RelOptCluster, schema: RelOptSchema): RelBuilder =
       new FlinkRelBuilder(context, cluster, schema)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index ce40ae2..59a5502 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.table.calcite
 
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo
+import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.table.api.{DataTypes, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory.toLogicalType
 import org.apache.flink.table.plan.schema.{GenericRelDataType, _}
 import org.apache.flink.table.types.logical._
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.types.Nothing
 import org.apache.flink.util.Preconditions.checkArgument
 
@@ -369,6 +370,24 @@ object FlinkTypeFactory {
     case _ => false
   }
 
+  @Deprecated
+  def isProctimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+    case ti: TimeIndicatorTypeInfo if !ti.isEventTime => true
+    case _ => false
+  }
+
+  @Deprecated
+  def isRowtimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+    case ti: TimeIndicatorTypeInfo if ti.isEventTime => true
+    case _ => false
+  }
+
+  @Deprecated
+  def isTimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+    case ti: TimeIndicatorTypeInfo => true
+    case _ => false
+  }
+
   def toLogicalType(relDataType: RelDataType): LogicalType = {
     val logicalType = relDataType.getSqlTypeName match {
       case BOOLEAN => new BooleanType()
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExpressionBridge.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExpressionBridge.scala
new file mode 100644
index 0000000..7000bad
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExpressionBridge.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.table.catalog.FunctionLookup
+import org.apache.flink.table.expressions.resolver.LookupCallResolver
+
+/**
+  * Bridges between API [[Expression]]s (for both Java and Scala) and final expression stack.
+  */
+class ExpressionBridge[E <: Expression](
+    functionCatalog: FunctionLookup,
+    finalVisitor: ExpressionVisitor[E]) {
+
+  private val callResolver = new LookupCallResolver(functionCatalog)
+
+  def bridge(expression: Expression): E = {
+    // resolve calls
+    val resolvedExpressionTree = expression.accept(callResolver)
+
+    // convert to final expressions
+    resolvedExpressionTree.accept(finalVisitor)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
new file mode 100644
index 0000000..3506e9a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import scala.collection.mutable
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.validate._
+
+/**
+  * Expressions that have strict data type specification on its inputs.
+  */
+trait InputTypeSpec extends PlannerExpression {
+
+  /**
+    * Input type specification for each child.
+    *
+    * For example, [[Power]] expecting both of the children be of double type should use:
+    * {{{
+    *   def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil
+    * }}}
+    *
+    * Inputs that don't match the expected type will be safely casted to a higher type. Therefore,
+    * use the decimal type with caution as all numeric types would be casted to a very
+    * inefficient type.
+    */
+  private[flink] def expectedTypes: Seq[TypeInformation[_]]
+
+  override private[flink] def validateInput(): ValidationResult = {
+    val typeMismatches = mutable.ArrayBuffer.empty[String]
+
+    if(expectedTypes.size != children.size){
+      return ValidationFailure(
+        s"""|$this fails on input type size checking: expected types size[${expectedTypes.size}].
+            |Operands types size[${children.size}].
+            |""".stripMargin)
+    }
+
+    children.zip(expectedTypes).zipWithIndex.foreach { case ((e, tpe), i) =>
+      if (e.resultType != tpe) {
+        typeMismatches += s"expecting $tpe on ${i}th input, get ${e.resultType}"
+      }
+    }
+    if (typeMismatches.isEmpty) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(
+        s"""|$this fails on input type checking: ${typeMismatches.mkString("[", ", ", "]")}.
+            |Operand should be casted to proper type
+            |""".stripMargin)
+    }
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpression.scala
new file mode 100644
index 0000000..92efcb5
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpression.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import java.util
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.plan.TreeNode
+import org.apache.flink.table.validate.{ValidationResult, ValidationSuccess}
+
+import _root_.scala.collection.JavaConversions._
+
+abstract class PlannerExpression extends TreeNode[PlannerExpression] with Expression {
+  /**
+    * Returns the [[TypeInformation]] for evaluating this expression.
+    * It is sometimes not available until the expression is valid.
+    */
+  private[flink] def resultType: TypeInformation[_]
+
+  /**
+    * One pass validation of the expression tree in post order.
+    */
+  private[flink] lazy val valid: Boolean = childrenValid && validateInput().isSuccess
+
+  private[flink] def childrenValid: Boolean = children.forall(_.valid)
+
+  /**
+    * Check input data types, inputs number or other properties specified by this expression.
+    * Return `ValidationSuccess` if it pass the check,
+    * or `ValidationFailure` with supplement message explaining the error.
+    * Note: we should only call this method until `childrenValid == true`
+    */
+  private[flink] def validateInput(): ValidationResult = ValidationSuccess
+
+  /**
+    * Convert Expression to its counterpart in Calcite, i.e. RexNode
+    */
+  private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException(
+      s"${this.getClass.getName} cannot be transformed to RexNode"
+    )
+
+  private[flink] def checkEquals(other: PlannerExpression): Boolean = {
+    if (this.getClass != other.getClass) {
+      false
+    } else {
+      def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
+        elements1.length == elements2.length && elements1.zip(elements2).forall {
+          case (e1: PlannerExpression, e2: PlannerExpression) => e1.checkEquals(e2)
+          case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2)
+          case (i1, i2) => i1 == i2
+        }
+      }
+      val elements1 = this.productIterator.toSeq
+      val elements2 = other.productIterator.toSeq
+      checkEquality(elements1, elements2)
+    }
+  }
+
+  override def asSummaryString(): String = toString
+
+  override def getChildren: util.List[Expression] = children
+
+  override def accept[R](visitor: ExpressionVisitor[R]): R = visitor.visit(this)
+}
+
+abstract class BinaryExpression extends PlannerExpression {
+  private[flink] def left: PlannerExpression
+  private[flink] def right: PlannerExpression
+  private[flink] def children = Seq(left, right)
+}
+
+abstract class UnaryExpression extends PlannerExpression {
+  private[flink] def child: PlannerExpression
+  private[flink] def children = Seq(child)
+}
+
+abstract class LeafExpression extends PlannerExpression {
+  private[flink] val children = Nil
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
new file mode 100644
index 0000000..d52d6e6a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -0,0 +1,836 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
+import org.apache.flink.table.expressions.{E => PlannerE, UUID => PlannerUUID}
+import org.apache.flink.table.functions._
+import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE}
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
+import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+  * Visitor implementation for converting [[Expression]]s to [[PlannerExpression]]s.
+  */
+class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExpression] {
+
+  override def visit(call: CallExpression): PlannerExpression = {
+    translateCall(call.getFunctionDefinition, call.getChildren.asScala)
+  }
+
+  override def visit(unresolvedCall: UnresolvedCallExpression): PlannerExpression = {
+    translateCall(unresolvedCall.getFunctionDefinition, unresolvedCall.getChildren.asScala)
+  }
+
+  private def translateCall(
+      func: FunctionDefinition,
+      children: Seq[Expression])
+    : PlannerExpression = {
+
+    // special case: requires individual handling of child expressions
+    func match {
+      case CAST =>
+        assert(children.size == 2)
+        return Cast(
+          children.head.accept(this),
+          fromDataTypeToLegacyInfo(
+            children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
+
+      case WINDOW_START =>
+        assert(children.size == 1)
+        val windowReference = translateWindowReference(children.head)
+        return WindowStart(windowReference)
+
+      case WINDOW_END =>
+        assert(children.size == 1)
+        val windowReference = translateWindowReference(children.head)
+        return WindowEnd(windowReference)
+
+      case PROCTIME =>
+        assert(children.size == 1)
+        val windowReference = translateWindowReference(children.head)
+        return ProctimeAttribute(windowReference)
+
+      case ROWTIME =>
+        assert(children.size == 1)
+        val windowReference = translateWindowReference(children.head)
+        return RowtimeAttribute(windowReference)
+
+      case _ =>
+    }
+
+    val args = children.map(_.accept(this))
+
+    func match {
+      case sfd: ScalarFunctionDefinition =>
+        val call = PlannerScalarFunctionCall(
+          sfd.getScalarFunction,
+          args)
+        //it configures underlying state
+        call.validateInput()
+        call
+
+      case tfd: TableFunctionDefinition =>
+        PlannerTableFunctionCall(
+          tfd.toString,
+          tfd.getTableFunction,
+          args,
+          tfd.getResultType)
+
+      case afd: AggregateFunctionDefinition =>
+        AggFunctionCall(
+          afd.getAggregateFunction,
+          afd.getResultTypeInfo,
+          afd.getAccumulatorTypeInfo,
+          args)
+
+      case tafd: TableAggregateFunctionDefinition =>
+        AggFunctionCall(
+          tafd.getTableAggregateFunction,
+          tafd.getResultTypeInfo,
+          tafd.getAccumulatorTypeInfo,
+          args)
+
+      case fd: FunctionDefinition =>
+        fd match {
+
+          case AS =>
+            assert(args.size >= 2)
+            val name = getValue[String](args(1))
+            val extraNames = args
+              .drop(2)
+              .map(e => getValue[String](e))
+            Alias(args.head, name, extraNames)
+
+          case FLATTEN =>
+            assert(args.size == 1)
+            Flattening(args.head)
+
+          case GET =>
+            assert(args.size == 2)
+            val expr = GetCompositeField(args.head, getValue(args.last))
+            //it configures underlying state
+            expr.validateInput()
+            expr
+
+          case AND =>
+            assert(args.size == 2)
+            And(args.head, args.last)
+
+          case OR =>
+            assert(args.size == 2)
+            Or(args.head, args.last)
+
+          case NOT =>
+            assert(args.size == 1)
+            Not(args.head)
+
+          case EQUALS =>
+            assert(args.size == 2)
+            EqualTo(args.head, args.last)
+
+          case GREATER_THAN =>
+            assert(args.size == 2)
+            GreaterThan(args.head, args.last)
+
+          case GREATER_THAN_OR_EQUAL =>
+            assert(args.size == 2)
+            GreaterThanOrEqual(args.head, args.last)
+
+          case LESS_THAN =>
+            assert(args.size == 2)
+            LessThan(args.head, args.last)
+
+          case LESS_THAN_OR_EQUAL =>
+            assert(args.size == 2)
+            LessThanOrEqual(args.head, args.last)
+
+          case NOT_EQUALS =>
+            assert(args.size == 2)
+            NotEqualTo(args.head, args.last)
+
+          case IN =>
+            assert(args.size > 1)
+            In(args.head, args.drop(1))
+
+          case IS_NULL =>
+            assert(args.size == 1)
+            IsNull(args.head)
+
+          case IS_NOT_NULL =>
+            assert(args.size == 1)
+            IsNotNull(args.head)
+
+          case IS_TRUE =>
+            assert(args.size == 1)
+            IsTrue(args.head)
+
+          case IS_FALSE =>
+            assert(args.size == 1)
+            IsFalse(args.head)
+
+          case IS_NOT_TRUE =>
+            assert(args.size == 1)
+            IsNotTrue(args.head)
+
+          case IS_NOT_FALSE =>
+            assert(args.size == 1)
+            IsNotFalse(args.head)
+
+          case IF =>
+            assert(args.size == 3)
+            If(args.head, args(1), args.last)
+
+          case BETWEEN =>
+            assert(args.size == 3)
+            Between(args.head, args(1), args.last)
+
+          case NOT_BETWEEN =>
+            assert(args.size == 3)
+            NotBetween(args.head, args(1), args.last)
+
+          case DISTINCT =>
+            assert(args.size == 1)
+            DistinctAgg(args.head)
+
+          case AVG =>
+            assert(args.size == 1)
+            Avg(args.head)
+
+          case COUNT =>
+            assert(args.size == 1)
+            Count(args.head)
+
+          case MAX =>
+            assert(args.size == 1)
+            Max(args.head)
+
+          case MIN =>
+            assert(args.size == 1)
+            Min(args.head)
+
+          case SUM =>
+            assert(args.size == 1)
+            Sum(args.head)
+
+          case SUM0 =>
+            assert(args.size == 1)
+            Sum0(args.head)
+
+          case STDDEV_POP =>
+            assert(args.size == 1)
+            StddevPop(args.head)
+
+          case STDDEV_SAMP =>
+            assert(args.size == 1)
+            StddevSamp(args.head)
+
+          case VAR_POP =>
+            assert(args.size == 1)
+            VarPop(args.head)
+
+          case VAR_SAMP =>
+            assert(args.size == 1)
+            VarSamp(args.head)
+
+          case COLLECT =>
+            assert(args.size == 1)
+            Collect(args.head)
+
+          case CHAR_LENGTH =>
+            assert(args.size == 1)
+            CharLength(args.head)
+
+          case INIT_CAP =>
+            assert(args.size == 1)
+            InitCap(args.head)
+
+          case LIKE =>
+            assert(args.size == 2)
+            Like(args.head, args.last)
+
+          case LOWER =>
+            assert(args.size == 1)
+            Lower(args.head)
+
+          case SIMILAR =>
+            assert(args.size == 2)
+            Similar(args.head, args.last)
+
+          case SUBSTRING =>
+            assert(args.size == 2 || args.size == 3)
+            if (args.size == 2) {
+              new Substring(args.head, args.last)
+            } else {
+              Substring(args.head, args(1), args.last)
+            }
+
+          case REPLACE =>
+            assert(args.size == 2 || args.size == 3)
+            if (args.size == 2) {
+              new Replace(args.head, args.last)
+            } else {
+              Replace(args.head, args(1), args.last)
+            }
+
+          case TRIM =>
+            assert(args.size == 4)
+            val removeLeading = getValue[Boolean](args.head)
+            val removeTrailing = getValue[Boolean](args(1))
+
+            val trimMode = if (removeLeading && removeTrailing) {
+              PlannerTrimMode.BOTH
+            } else if (removeLeading) {
+              PlannerTrimMode.LEADING
+            } else if (removeTrailing) {
+              PlannerTrimMode.TRAILING
+            } else {
+              throw new TableException("Unsupported trim mode.")
+            }
+            Trim(trimMode, args(2), args(3))
+
+          case UPPER =>
+            assert(args.size == 1)
+            Upper(args.head)
+
+          case POSITION =>
+            assert(args.size == 2)
+            Position(args.head, args.last)
+
+          case OVERLAY =>
+            assert(args.size == 3 || args.size == 4)
+            if (args.size == 3) {
+              new Overlay(args.head, args(1), args.last)
+            } else {
+              Overlay(
+                args.head,
+                args(1),
+                args(2),
+                args.last)
+            }
+
+          case CONCAT =>
+            Concat(args)
+
+          case CONCAT_WS =>
+            assert(args.nonEmpty)
+            ConcatWs(args.head, args.tail)
+
+          case LPAD =>
+            assert(args.size == 3)
+            Lpad(args.head, args(1), args.last)
+
+          case RPAD =>
+            assert(args.size == 3)
+            Rpad(args.head, args(1), args.last)
+
+          case REGEXP_EXTRACT =>
+            assert(args.size == 2 || args.size == 3)
+            if (args.size == 2) {
+              RegexpExtract(args.head, args.last)
+            } else {
+              RegexpExtract(args.head, args(1), args.last)
+            }
+
+          case FROM_BASE64 =>
+            assert(args.size == 1)
+            FromBase64(args.head)
+
+          case TO_BASE64 =>
+            assert(args.size == 1)
+            ToBase64(args.head)
+
+          case BuiltInFunctionDefinitions.UUID =>
+            assert(args.isEmpty)
+            PlannerUUID()
+
+          case LTRIM =>
+            assert(args.size == 1)
+            LTrim(args.head)
+
+          case RTRIM =>
+            assert(args.size == 1)
+            RTrim(args.head)
+
+          case REPEAT =>
+            assert(args.size == 2)
+            Repeat(args.head, args.last)
+
+          case REGEXP_REPLACE =>
+            assert(args.size == 3)
+            RegexpReplace(args.head, args(1), args.last)
+
+          case PLUS =>
+            assert(args.size == 2)
+            Plus(args.head, args.last)
+
+          case MINUS =>
+            assert(args.size == 2)
+            Minus(args.head, args.last)
+
+          case DIVIDE =>
+            assert(args.size == 2)
+            Div(args.head, args.last)
+
+          case TIMES =>
+            assert(args.size == 2)
+            Mul(args.head, args.last)
+
+          case ABS =>
+            assert(args.size == 1)
+            Abs(args.head)
+
+          case CEIL =>
+            assert(args.size == 1 || args.size == 2)
+            if (args.size == 1) {
+              Ceil(args.head)
+            } else {
+              TemporalCeil(args.head, args.last)
+            }
+
+          case EXP =>
+            assert(args.size == 1)
+            Exp(args.head)
+
+          case FLOOR =>
+            assert(args.size == 1 || args.size == 2)
+            if (args.size == 1) {
+              Floor(args.head)
+            } else {
+              TemporalFloor(args.head, args.last)
+            }
+
+          case LOG10 =>
+            assert(args.size == 1)
+            Log10(args.head)
+
+          case LOG2 =>
+            assert(args.size == 1)
+            Log2(args.head)
+
+          case LN =>
+            assert(args.size == 1)
+            Ln(args.head)
+
+          case LOG =>
+            assert(args.size == 1 || args.size == 2)
+            if (args.size == 1) {
+              Log(args.head)
+            } else {
+              Log(args.head, args.last)
+            }
+
+          case POWER =>
+            assert(args.size == 2)
+            Power(args.head, args.last)
+
+          case MOD =>
+            assert(args.size == 2)
+            Mod(args.head, args.last)
+
+          case SQRT =>
+            assert(args.size == 1)
+            Sqrt(args.head)
+
+          case MINUS_PREFIX =>
+            assert(args.size == 1)
+            UnaryMinus(args.head)
+
+          case SIN =>
+            assert(args.size == 1)
+            Sin(args.head)
+
+          case COS =>
+            assert(args.size == 1)
+            Cos(args.head)
+
+          case SINH =>
+            assert(args.size == 1)
+            Sinh(args.head)
+
+          case TAN =>
+            assert(args.size == 1)
+            Tan(args.head)
+
+          case TANH =>
+            assert(args.size == 1)
+            Tanh(args.head)
+
+          case COT =>
+            assert(args.size == 1)
+            Cot(args.head)
+
+          case ASIN =>
+            assert(args.size == 1)
+            Asin(args.head)
+
+          case ACOS =>
+            assert(args.size == 1)
+            Acos(args.head)
+
+          case ATAN =>
+            assert(args.size == 1)
+            Atan(args.head)
+
+          case ATAN2 =>
+            assert(args.size == 2)
+            Atan2(args.head, args.last)
+
+          case COSH =>
+            assert(args.size == 1)
+            Cosh(args.head)
+
+          case DEGREES =>
+            assert(args.size == 1)
+            Degrees(args.head)
+
+          case RADIANS =>
+            assert(args.size == 1)
+            Radians(args.head)
+
+          case SIGN =>
+            assert(args.size == 1)
+            Sign(args.head)
+
+          case ROUND =>
+            assert(args.size == 2)
+            Round(args.head, args.last)
+
+          case PI =>
+            assert(args.isEmpty)
+            Pi()
+
+          case BuiltInFunctionDefinitions.E =>
+            assert(args.isEmpty)
+            PlannerE()
+
+          case RAND =>
+            assert(args.isEmpty || args.size == 1)
+            if (args.isEmpty) {
+              new Rand()
+            } else {
+              Rand(args.head)
+            }
+
+          case RAND_INTEGER =>
+            assert(args.size == 1 || args.size == 2)
+            if (args.size == 1) {
+              new RandInteger(args.head)
+            } else {
+              RandInteger(args.head, args.last)
+            }
+
+          case BIN =>
+            assert(args.size == 1)
+            Bin(args.head)
+
+          case HEX =>
+            assert(args.size == 1)
+            Hex(args.head)
+
+          case TRUNCATE =>
+            assert(args.size == 1 || args.size == 2)
+            if (args.size == 1) {
+              new Truncate(args.head)
+            } else {
+              Truncate(args.head, args.last)
+            }
+
+          case EXTRACT =>
+            assert(args.size == 2)
+            Extract(args.head, args.last)
+
+          case CURRENT_DATE =>
+            assert(args.isEmpty)
+            CurrentDate()
+
+          case CURRENT_TIME =>
+            assert(args.isEmpty)
+            CurrentTime()
+
+          case CURRENT_TIMESTAMP =>
+            assert(args.isEmpty)
+            CurrentTimestamp()
+
+          case LOCAL_TIME =>
+            assert(args.isEmpty)
+            LocalTime()
+
+          case LOCAL_TIMESTAMP =>
+            assert(args.isEmpty)
+            LocalTimestamp()
+
+          case TEMPORAL_OVERLAPS =>
+            assert(args.size == 4)
+            TemporalOverlaps(
+              args.head,
+              args(1),
+              args(2),
+              args.last)
+
+          case DATE_TIME_PLUS =>
+            assert(args.size == 2)
+            Plus(args.head, args.last)
+
+          case DATE_FORMAT =>
+            assert(args.size == 2)
+            DateFormat(args.head, args.last)
+
+          case TIMESTAMP_DIFF =>
+            assert(args.size == 3)
+            TimestampDiff(args.head, args(1), args.last)
+
+          case AT =>
+            assert(args.size == 2)
+            ItemAt(args.head, args.last)
+
+          case CARDINALITY =>
+            assert(args.size == 1)
+            Cardinality(args.head)
+
+          case ARRAY =>
+            ArrayConstructor(args)
+
+          case ARRAY_ELEMENT =>
+            assert(args.size == 1)
+            ArrayElement(args.head)
+
+          case MAP =>
+            MapConstructor(args)
+
+          case ROW =>
+            RowConstructor(args)
+
+          case ORDER_ASC =>
+            assert(args.size == 1)
+            Asc(args.head)
+
+          case ORDER_DESC =>
+            assert(args.size == 1)
+            Desc(args.head)
+
+          case MD5 =>
+            assert(args.size == 1)
+            Md5(args.head)
+
+          case SHA1 =>
+            assert(args.size == 1)
+            Sha1(args.head)
+
+          case SHA224 =>
+            assert(args.size == 1)
+            Sha224(args.head)
+
+          case SHA256 =>
+            assert(args.size == 1)
+            Sha256(args.head)
+
+          case SHA384 =>
+            assert(args.size == 1)
+            Sha384(args.head)
+
+          case SHA512 =>
+            assert(args.size == 1)
+            Sha512(args.head)
+
+          case SHA2 =>
+            assert(args.size == 2)
+            Sha2(args.head, args.last)
+
+          case OVER =>
+            assert(args.size >= 4)
+            OverCall(
+              args.head,
+              args.slice(4, args.size),
+              args(1),
+              args(2),
+              args(3)
+            )
+
+          case UNBOUNDED_RANGE =>
+            assert(args.isEmpty)
+            UnboundedRange()
+
+          case UNBOUNDED_ROW =>
+            assert(args.isEmpty)
+            UnboundedRow()
+
+          case CURRENT_RANGE =>
+            assert(args.isEmpty)
+            CurrentRange()
+
+          case CURRENT_ROW =>
+            assert(args.isEmpty)
+            CurrentRow()
+
+          case _ =>
+            throw new TableException(s"Unsupported function definition: $fd")
+        }
+    }
+  }
+
+  override def visit(literal: ValueLiteralExpression): PlannerExpression = {
+    if (hasRoot(literal.getOutputDataType.getLogicalType, SYMBOL)) {
+      val plannerSymbol = getSymbol(literal.getValueAs(classOf[TableSymbol]).get())
+      return SymbolPlannerExpression(plannerSymbol)
+    }
+
+    val typeInfo = getLiteralTypeInfo(literal)
+    if (literal.isNull) {
+      Null(typeInfo)
+    } else {
+      Literal(
+        literal.getValueAs(typeInfo.getTypeClass).get(),
+        typeInfo)
+    }
+  }
+
+  /**
+    * This method makes the planner more lenient for new data types defined for literals.
+    */
+  private def getLiteralTypeInfo(literal: ValueLiteralExpression): TypeInformation[_] = {
+    val logicalType = literal.getOutputDataType.getLogicalType
+
+    if (hasRoot(logicalType, DECIMAL)) {
+      if (literal.isNull) {
+        return Types.BIG_DEC
+      }
+      val value = literal.getValueAs(classOf[java.math.BigDecimal]).get()
+      if (hasPrecision(logicalType, value.precision()) && hasScale(logicalType, value.scale())) {
+        return Types.BIG_DEC
+      }
+    }
+
+    else if (hasRoot(logicalType, CHAR)) {
+      if (literal.isNull) {
+        return Types.STRING
+      }
+      val value = literal.getValueAs(classOf[java.lang.String]).get()
+      if (hasLength(logicalType, value.length)) {
+        return Types.STRING
+      }
+    }
+
+    else if (hasRoot(logicalType, TIMESTAMP_WITHOUT_TIME_ZONE)) {
+      if (getPrecision(logicalType) <= 3) {
+        return Types.SQL_TIMESTAMP
+      }
+    }
+
+    fromDataTypeToLegacyInfo(literal.getOutputDataType)
+  }
+
+  private def getSymbol(symbol: TableSymbol): PlannerSymbol = symbol match {
+    case TimeIntervalUnit.YEAR => PlannerTimeIntervalUnit.YEAR
+    case TimeIntervalUnit.YEAR_TO_MONTH => PlannerTimeIntervalUnit.YEAR_TO_MONTH
+    case TimeIntervalUnit.QUARTER => PlannerTimeIntervalUnit.QUARTER
+    case TimeIntervalUnit.MONTH => PlannerTimeIntervalUnit.MONTH
+    case TimeIntervalUnit.WEEK => PlannerTimeIntervalUnit.WEEK
+    case TimeIntervalUnit.DAY => PlannerTimeIntervalUnit.DAY
+    case TimeIntervalUnit.DAY_TO_HOUR => PlannerTimeIntervalUnit.DAY_TO_HOUR
+    case TimeIntervalUnit.DAY_TO_MINUTE => PlannerTimeIntervalUnit.DAY_TO_MINUTE
+    case TimeIntervalUnit.DAY_TO_SECOND => PlannerTimeIntervalUnit.DAY_TO_SECOND
+    case TimeIntervalUnit.HOUR => PlannerTimeIntervalUnit.HOUR
+    case TimeIntervalUnit.SECOND => PlannerTimeIntervalUnit.SECOND
+    case TimeIntervalUnit.HOUR_TO_MINUTE => PlannerTimeIntervalUnit.HOUR_TO_MINUTE
+    case TimeIntervalUnit.HOUR_TO_SECOND => PlannerTimeIntervalUnit.HOUR_TO_SECOND
+    case TimeIntervalUnit.MINUTE => PlannerTimeIntervalUnit.MINUTE
+    case TimeIntervalUnit.MINUTE_TO_SECOND => PlannerTimeIntervalUnit.MINUTE_TO_SECOND
+    case TimePointUnit.YEAR => PlannerTimePointUnit.YEAR
+    case TimePointUnit.MONTH => PlannerTimePointUnit.MONTH
+    case TimePointUnit.DAY => PlannerTimePointUnit.DAY
+    case TimePointUnit.HOUR => PlannerTimePointUnit.HOUR
+    case TimePointUnit.MINUTE => PlannerTimePointUnit.MINUTE
+    case TimePointUnit.SECOND => PlannerTimePointUnit.SECOND
+    case TimePointUnit.QUARTER => PlannerTimePointUnit.QUARTER
+    case TimePointUnit.WEEK => PlannerTimePointUnit.WEEK
+    case TimePointUnit.MILLISECOND => PlannerTimePointUnit.MILLISECOND
+    case TimePointUnit.MICROSECOND => PlannerTimePointUnit.MICROSECOND
+
+    case _ =>
+      throw new TableException("Unsupported symbol: " + symbol)
+  }
+
+  override def visit(fieldReference: FieldReferenceExpression): PlannerExpression = {
+    PlannerResolvedFieldReference(
+      fieldReference.getName,
+      fromDataTypeToLegacyInfo(fieldReference.getOutputDataType))
+  }
+
+  override def visit(fieldReference: UnresolvedReferenceExpression)
+    : PlannerExpression = {
+    UnresolvedFieldReference(fieldReference.getName)
+  }
+
+  override def visit(typeLiteral: TypeLiteralExpression): PlannerExpression = {
+    throw new TableException("Unsupported type literal expression: " + typeLiteral)
+  }
+
+  override def visit(tableRef: TableReferenceExpression): PlannerExpression = {
+    TableReference(
+      tableRef.asInstanceOf[TableReferenceExpression].getName,
+      tableRef.asInstanceOf[TableReferenceExpression].getQueryOperation
+    )
+  }
+
+  override def visit(localReference: LocalReferenceExpression): PlannerExpression =
+    throw new TableException(
+      "Local reference should be handled individually by a call: " + localReference)
+
+  override def visit(lookupCall: LookupCallExpression): PlannerExpression =
+    throw new TableException("Unsupported function call: " + lookupCall)
+
+  override def visitNonApiExpression(other: Expression): PlannerExpression = {
+    other match {
+      // already converted planner expressions will pass this visitor without modification
+      case plannerExpression: PlannerExpression => plannerExpression
+
+      case _ =>
+        throw new TableException("Unrecognized expression: " + other)
+    }
+  }
+
+  private def getValue[T](literal: PlannerExpression): T = {
+    literal.asInstanceOf[Literal].value.asInstanceOf[T]
+  }
+
+  private def assert(condition: Boolean): Unit = {
+    if (!condition) {
+      throw new ValidationException("Invalid number of arguments for function.")
+    }
+  }
+
+  private def translateWindowReference(reference: Expression): PlannerExpression = reference match {
+    case expr : LocalReferenceExpression =>
+      WindowReference(expr.getName, Some(fromDataTypeToLegacyInfo(expr.getOutputDataType)))
+    //just because how the datastream is converted to table
+    case expr: UnresolvedReferenceExpression =>
+      UnresolvedFieldReference(expr.getName)
+    case _ =>
+      throw new ValidationException(s"Expected LocalReferenceExpression. Got: $reference")
+  }
+}
+
+object PlannerExpressionConverter {
+  val INSTANCE: PlannerExpressionConverter = new PlannerExpressionConverter
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
new file mode 100644
index 0000000..9f71209
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
@@ -0,0 +1,726 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import _root_.java.math.{BigDecimal => JBigDecimal}
+import _root_.java.util.{List => JList}
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.api._
+import org.apache.flink.table.delegation.PlannerExpressionParser
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+
+import _root_.scala.collection.JavaConversions._
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
+
+/**
+  * The implementation of a [[PlannerExpressionParser]] which parsers expressions inside a String.
+  */
+class PlannerExpressionParserImpl extends PlannerExpressionParser {
+
+  def parseExpression(exprString: String): Expression = {
+    PlannerExpressionParserImpl.parseExpression(exprString)
+  }
+
+  override def parseExpressionList(expression: String): JList[Expression] = {
+    PlannerExpressionParserImpl.parseExpressionList(expression)
+  }
+}
+
+/**
+ * Parser for expressions inside a String. This parses exactly the same expressions that
+ * would be accepted by the Scala Expression DSL.
+ *
+ * See ImplicitExpressionConversions and ImplicitExpressionOperations for the constructs
+ * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL
+ * lazy valined in the above files.
+ */
+object PlannerExpressionParserImpl extends JavaTokenParsers
+  with PackratParsers
+  with PlannerExpressionParser {
+
+  case class Keyword(key: String)
+
+  // Convert the keyword into an case insensitive Parser
+  // The pattern ensures that the keyword is not matched as a prefix, i.e.,
+  //   the keyword is not followed by a Java identifier character.
+  implicit def keyword2Parser(kw: Keyword): Parser[String] = {
+    ("""(?i)\Q""" + kw.key + """\E(?![_$\p{javaJavaIdentifierPart}])""").r
+  }
+
+  // Keyword
+  lazy val AS: Keyword = Keyword("as")
+  lazy val CAST: Keyword = Keyword("cast")
+  lazy val ASC: Keyword = Keyword("asc")
+  lazy val DESC: Keyword = Keyword("desc")
+  lazy val NULL: Keyword = Keyword("Null")
+  lazy val NULL_OF: Keyword = Keyword("nullOf")
+  lazy val IF: Keyword = Keyword("?")
+  lazy val TO_DATE: Keyword = Keyword("toDate")
+  lazy val TO_TIME: Keyword = Keyword("toTime")
+  lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
+  lazy val TRIM: Keyword = Keyword("trim")
+  lazy val EXTRACT: Keyword = Keyword("extract")
+  lazy val TIMESTAMP_DIFF: Keyword = Keyword("timestampDiff")
+  lazy val FLOOR: Keyword = Keyword("floor")
+  lazy val CEIL: Keyword = Keyword("ceil")
+  lazy val LOG: Keyword = Keyword("log")
+  lazy val YEARS: Keyword = Keyword("years")
+  lazy val YEAR: Keyword = Keyword("year")
+  lazy val QUARTERS: Keyword = Keyword("quarters")
+  lazy val QUARTER: Keyword = Keyword("quarter")
+  lazy val MONTHS: Keyword = Keyword("months")
+  lazy val MONTH: Keyword = Keyword("month")
+  lazy val WEEKS: Keyword = Keyword("weeks")
+  lazy val WEEK: Keyword = Keyword("week")
+  lazy val DAYS: Keyword = Keyword("days")
+  lazy val DAY: Keyword = Keyword("day")
+  lazy val HOURS: Keyword = Keyword("hours")
+  lazy val HOUR: Keyword = Keyword("hour")
+  lazy val MINUTES: Keyword = Keyword("minutes")
+  lazy val MINUTE: Keyword = Keyword("minute")
+  lazy val SECONDS: Keyword = Keyword("seconds")
+  lazy val SECOND: Keyword = Keyword("second")
+  lazy val MILLIS: Keyword = Keyword("millis")
+  lazy val MILLI: Keyword = Keyword("milli")
+  lazy val ROWS: Keyword = Keyword("rows")
+  lazy val STAR: Keyword = Keyword("*")
+  lazy val GET: Keyword = Keyword("get")
+  lazy val FLATTEN: Keyword = Keyword("flatten")
+  lazy val OVER: Keyword = Keyword("over")
+  lazy val DISTINCT: Keyword = Keyword("distinct")
+  lazy val CURRENT_ROW: Keyword = Keyword("current_row")
+  lazy val CURRENT_RANGE: Keyword = Keyword("current_range")
+  lazy val UNBOUNDED_ROW: Keyword = Keyword("unbounded_row")
+  lazy val UNBOUNDED_RANGE: Keyword = Keyword("unbounded_range")
+  lazy val ROWTIME: Keyword = Keyword("rowtime")
+  lazy val PROCTIME: Keyword = Keyword("proctime")
+  lazy val TRUE: Keyword = Keyword("true")
+  lazy val FALSE: Keyword = Keyword("false")
+  lazy val PRIMITIVE_ARRAY: Keyword = Keyword("PRIMITIVE_ARRAY")
+  lazy val OBJECT_ARRAY: Keyword = Keyword("OBJECT_ARRAY")
+  lazy val MAP: Keyword = Keyword("MAP")
+  lazy val BYTE: Keyword = Keyword("BYTE")
+  lazy val SHORT: Keyword = Keyword("SHORT")
+  lazy val INTERVAL_MONTHS: Keyword = Keyword("INTERVAL_MONTHS")
+  lazy val INTERVAL_MILLIS: Keyword = Keyword("INTERVAL_MILLIS")
+  lazy val INT: Keyword = Keyword("INT")
+  lazy val LONG: Keyword = Keyword("LONG")
+  lazy val FLOAT: Keyword = Keyword("FLOAT")
+  lazy val DOUBLE: Keyword = Keyword("DOUBLE")
+  lazy val BOOLEAN: Keyword = Keyword("BOOLEAN")
+  lazy val STRING: Keyword = Keyword("STRING")
+  lazy val SQL_DATE: Keyword = Keyword("SQL_DATE")
+  lazy val SQL_TIMESTAMP: Keyword = Keyword("SQL_TIMESTAMP")
+  lazy val SQL_TIME: Keyword = Keyword("SQL_TIME")
+  lazy val DECIMAL: Keyword = Keyword("DECIMAL")
+  lazy val TRIM_MODE_LEADING: Keyword = Keyword("LEADING")
+  lazy val TRIM_MODE_TRAILING: Keyword = Keyword("TRAILING")
+  lazy val TRIM_MODE_BOTH: Keyword = Keyword("BOTH")
+  lazy val TO: Keyword = Keyword("TO")
+
+  def functionIdent: PlannerExpressionParserImpl.Parser[String] = super.ident
+
+  // symbols
+
+  lazy val timeIntervalUnit: PackratParser[Expression] = TimeIntervalUnit.values map {
+    unit: TimeIntervalUnit => literal(unit.toString) ^^^ valueLiteral(unit)
+  } reduceLeft(_ | _)
+
+  lazy val timePointUnit: PackratParser[Expression] = TimePointUnit.values map {
+    unit: TimePointUnit => literal(unit.toString) ^^^ valueLiteral(unit)
+  } reduceLeft(_ | _)
+
+  lazy val currentRange: PackratParser[Expression] = CURRENT_RANGE ^^ {
+    _ => unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE)
+  }
+
+  lazy val currentRow: PackratParser[Expression] = CURRENT_ROW ^^ {
+    _ => unresolvedCall(BuiltInFunctionDefinitions.CURRENT_ROW)
+  }
+
+  lazy val unboundedRange: PackratParser[Expression] = UNBOUNDED_RANGE ^^ {
+    _ => unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE)
+  }
+
+  lazy val unboundedRow: PackratParser[Expression] = UNBOUNDED_ROW ^^ {
+    _ => unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_ROW)
+  }
+
+  lazy val overConstant: PackratParser[Expression] =
+    currentRange | currentRow | unboundedRange | unboundedRow
+
+  lazy val trimMode: PackratParser[String] =
+    TRIM_MODE_LEADING | TRIM_MODE_TRAILING | TRIM_MODE_BOTH
+
+  // data types
+
+  lazy val dataType: PackratParser[TypeInformation[_]] =
+    PRIMITIVE_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => Types.PRIMITIVE_ARRAY(ct) } |
+    OBJECT_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => Types.OBJECT_ARRAY(ct) } |
+    MAP ~ "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ { mt => Types.MAP(mt._1._1, mt._2)} |
+    BYTE ^^ { e => Types.BYTE } |
+    SHORT ^^ { e => Types.SHORT } |
+    INTERVAL_MONTHS ^^ { e => Types.INTERVAL_MONTHS } |
+    INTERVAL_MILLIS ^^ { e => Types.INTERVAL_MILLIS } |
+    INT ^^ { e => Types.INT } |
+    LONG ^^ { e => Types.LONG } |
+    FLOAT ^^ { e => Types.FLOAT } |
+    DOUBLE ^^ { e => Types.DOUBLE } |
+    BOOLEAN ^^ { { e => Types.BOOLEAN } } |
+    STRING ^^ { e => Types.STRING } |
+    SQL_DATE ^^ { e => Types.SQL_DATE } |
+    SQL_TIMESTAMP ^^ { e => Types.SQL_TIMESTAMP } |
+    SQL_TIME ^^ { e => Types.SQL_TIME } |
+    DECIMAL ^^ { e => Types.DECIMAL }
+
+  // literals
+
+  // same as floatingPointNumber but we do not allow trailing dot "12.d" or "2."
+  lazy val floatingPointNumberFlink: Parser[String] =
+    """-?(\d+(\.\d+)?|\d*\.\d+)([eE][+-]?\d+)?[fFdD]?""".r
+
+  lazy val numberLiteral: PackratParser[Expression] =
+    (wholeNumber <~ ("l" | "L")) ^^ { n => valueLiteral(n.toLong) } |
+      (decimalNumber <~ ("p" | "P")) ^^ { n => valueLiteral(new JBigDecimal(n)) } |
+      (floatingPointNumberFlink | decimalNumber) ^^ {
+        n =>
+          if (n.matches("""-?\d+""")) {
+            valueLiteral(n.toInt)
+          } else if (n.endsWith("f") || n.endsWith("F")) {
+            valueLiteral(n.toFloat)
+          } else {
+            valueLiteral(n.toDouble)
+          }
+      }
+
+  // string with single quotes such as 'It''s me.'
+  lazy val singleQuoteStringLiteral: Parser[Expression] = "'(?:''|[^'])*'".r ^^ {
+    str =>
+      val escaped = str.substring(1, str.length - 1).replace("''", "'")
+      valueLiteral(escaped)
+  }
+
+  // string with double quotes such as "I ""like"" dogs."
+  lazy val doubleQuoteStringLiteral: PackratParser[Expression] = "\"(?:\"\"|[^\"])*\"".r ^^ {
+    str =>
+      val escaped = str.substring(1, str.length - 1).replace("\"\"", "\"")
+      valueLiteral(escaped)
+  }
+
+  lazy val boolLiteral: PackratParser[Expression] = (TRUE | FALSE) ^^ {
+    str => valueLiteral(str.toBoolean)
+  }
+
+  lazy val nullLiteral: PackratParser[Expression] = (NULL | NULL_OF) ~ "(" ~> dataType <~ ")" ^^ {
+    dt => valueLiteral(null, fromLegacyInfoToDataType(dt))
+  }
+
+  lazy val literalExpr: PackratParser[Expression] =
+    numberLiteral | doubleQuoteStringLiteral | singleQuoteStringLiteral | boolLiteral
+
+  lazy val fieldReference: PackratParser[UnresolvedReferenceExpression] = (STAR | ident) ^^ {
+    sym => unresolvedRef(sym)
+  }
+
+  lazy val atom: PackratParser[Expression] =
+    ( "(" ~> expression <~ ")" ) | (fieldReference ||| literalExpr)
+
+  lazy val over: PackratParser[Expression] = composite ~ OVER ~ fieldReference ^^ {
+    case agg ~ _ ~ windowRef =>
+      unresolvedCall(BuiltInFunctionDefinitions.OVER, agg, windowRef)
+  }
+
+  // suffix operators
+
+  lazy val suffixAsc : PackratParser[Expression] = composite <~ "." ~ ASC ~ opt("()") ^^ { e =>
+      unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC, e)
+  }
+
+  lazy val suffixDesc : PackratParser[Expression] = composite <~ "." ~ DESC ~ opt("()") ^^ { e =>
+    unresolvedCall(BuiltInFunctionDefinitions.ORDER_DESC, e)
+  }
+
+  lazy val suffixCast: PackratParser[Expression] =
+    composite ~ "." ~ CAST ~ "(" ~ dataType ~ ")" ^^ {
+      case e ~ _ ~ _ ~ _ ~ dt ~ _ =>
+        unresolvedCall(
+          BuiltInFunctionDefinitions.CAST,
+          e,
+          typeLiteral(fromLegacyInfoToDataType(dt)))
+    }
+
+  lazy val suffixTrim: PackratParser[Expression] =
+    composite ~ "." ~ TRIM ~ "(" ~ trimMode ~
+        "," ~ expression ~ ")" ^^ {
+      case operand ~ _ ~ _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ =>
+        unresolvedCall(
+          BuiltInFunctionDefinitions.TRIM,
+          valueLiteral(mode == TRIM_MODE_LEADING.key || mode == TRIM_MODE_BOTH.key),
+          valueLiteral(mode == TRIM_MODE_TRAILING.key || mode == TRIM_MODE_BOTH.key),
+          trimCharacter,
+          operand)
+    }
+
+  lazy val suffixTrimWithoutArgs: PackratParser[Expression] =
+    composite <~ "." ~ TRIM ~ opt("()") ^^ {
+      e =>
+        unresolvedCall(
+          BuiltInFunctionDefinitions.TRIM,
+          valueLiteral(true),
+          valueLiteral(true),
+          valueLiteral(" "),
+          e)
+    }
+
+  lazy val suffixIf: PackratParser[Expression] =
+    composite ~ "." ~ IF ~ "(" ~ expression ~ "," ~ expression ~ ")" ^^ {
+      case condition ~ _ ~ _ ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.IF, condition, ifTrue, ifFalse)
+    }
+
+  lazy val suffixExtract: PackratParser[Expression] =
+    composite ~ "." ~ EXTRACT ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+      case operand ~ _  ~ _ ~ _ ~ unit ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.EXTRACT, unit, operand)
+    }
+
+  lazy val suffixFloor: PackratParser[Expression] =
+    composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+      case operand ~ _  ~ _ ~ _ ~ unit ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.FLOOR, unit, operand)
+    }
+
+  lazy val suffixCeil: PackratParser[Expression] =
+    composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+      case operand ~ _  ~ _ ~ _ ~ unit ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.CEIL, unit, operand)
+    }
+
+  // required because op.log(base) changes order of a parameters
+  lazy val suffixLog: PackratParser[Expression] =
+    composite ~ "." ~ LOG ~ "(" ~ expression ~ ")" ^^ {
+      case operand ~ _ ~ _ ~ _ ~ base ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.LOG, base, operand)
+    }
+
+  lazy val suffixFunctionCall: PackratParser[Expression] =
+    composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+    case operand ~ _ ~ name ~ _ ~ args ~ _ =>
+      lookupCall(name, operand :: args: _*)
+  }
+
+  lazy val suffixFunctionCallOneArg: PackratParser[Expression] =
+    composite ~ "." ~ functionIdent ^^ {
+      case operand ~ _ ~ name =>
+        lookupCall(name, operand)
+    }
+
+  lazy val suffixToDate: PackratParser[Expression] =
+    composite <~ "." ~ TO_DATE ~ opt("()") ^^ { e =>
+      unresolvedCall(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE)))
+    }
+
+  lazy val suffixToTimestamp: PackratParser[Expression] =
+    composite <~ "." ~ TO_TIMESTAMP ~ opt("()") ^^ { e =>
+      unresolvedCall(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP)))
+    }
+
+  lazy val suffixToTime: PackratParser[Expression] =
+    composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e =>
+      unresolvedCall(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME)))
+    }
+
+  lazy val suffixTimeInterval : PackratParser[Expression] =
+    composite ~ "." ~ (YEARS | QUARTERS | MONTHS | WEEKS | DAYS |  HOURS | MINUTES |
+      SECONDS | MILLIS | YEAR | QUARTER | MONTH | WEEK | DAY | HOUR | MINUTE | SECOND | MILLI) ^^ {
+
+    case expr ~ _ ~ (YEARS.key | YEAR.key) => toMonthInterval(expr, 12)
+
+    case expr ~ _ ~ (QUARTERS.key | QUARTER.key) => toMonthInterval(expr, 3)
+
+    case expr ~ _ ~ (MONTHS.key | MONTH.key) => toMonthInterval(expr, 1)
+
+    case expr ~ _ ~ (WEEKS.key | WEEK.key) => toMilliInterval(expr, 7 * MILLIS_PER_DAY)
+
+    case expr ~ _ ~ (DAYS.key | DAY.key) => toMilliInterval(expr, MILLIS_PER_DAY)
+
+    case expr ~ _ ~ (HOURS.key | HOUR.key) => toMilliInterval(expr, MILLIS_PER_HOUR)
+
+    case expr ~ _ ~ (MINUTES.key | MINUTE.key) => toMilliInterval(expr, MILLIS_PER_MINUTE)
+
+    case expr ~ _ ~ (SECONDS.key | SECOND.key) => toMilliInterval(expr, MILLIS_PER_SECOND)
+
+    case expr ~ _ ~ (MILLIS.key | MILLI.key)=> toMilliInterval(expr, 1)
+  }
+
+  lazy val suffixRowInterval : PackratParser[Expression] =
+    composite <~ "." ~ ROWS ^^ { e => toRowInterval(e) }
+
+  lazy val suffixGet: PackratParser[Expression] =
+    composite ~ "." ~ GET ~ "(" ~ literalExpr ~ ")" ^^ {
+      case e ~ _ ~ _ ~ _ ~ index ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.GET, e, index)
+    }
+
+  lazy val suffixFlattening: PackratParser[Expression] =
+    composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e =>
+      unresolvedCall(BuiltInFunctionDefinitions.FLATTEN, e)
+    }
+
+  lazy val suffixDistinct: PackratParser[Expression] =
+    composite <~ "." ~ DISTINCT ~ opt("()") ^^ { e =>
+      unresolvedCall(BuiltInFunctionDefinitions.DISTINCT, e)
+    }
+
+  lazy val suffixAs: PackratParser[Expression] =
+    composite ~ "." ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+      case e ~ _ ~ _ ~ _ ~ names ~ _ =>
+        unresolvedCall(
+          BuiltInFunctionDefinitions.AS,
+          e :: names.map(n => valueLiteral(n.getName)): _*)
+  }
+
+  lazy val suffixed: PackratParser[Expression] =
+    // expressions that need to be resolved early
+    suffixFlattening |
+    // expressions that need special expression conversion
+    suffixAs | suffixTimeInterval | suffixRowInterval | suffixToTimestamp | suffixToTime |
+    suffixToDate |
+    // expression for log
+    suffixLog |
+    // expression for ordering
+    suffixAsc | suffixDesc |
+    // expressions that take enumerations
+    suffixCast | suffixTrim | suffixTrimWithoutArgs | suffixExtract | suffixFloor | suffixCeil |
+    // expressions that take literals
+    suffixGet |
+    // expression with special identifier
+    suffixIf |
+    // expression with distinct suffix modifier
+    suffixDistinct |
+    // function call must always be at the end
+    suffixFunctionCall | suffixFunctionCallOneArg |
+    // rowtime or proctime
+    timeIndicator
+
+  // prefix operators
+
+  lazy val prefixCast: PackratParser[Expression] =
+    CAST ~ "(" ~ expression ~ "," ~ dataType ~ ")" ^^ {
+      case _ ~ _ ~ e ~ _ ~ dt ~ _ =>
+        unresolvedCall(
+          BuiltInFunctionDefinitions.CAST,
+          e,
+          typeLiteral(fromLegacyInfoToDataType(dt)))
+    }
+
+  lazy val prefixIf: PackratParser[Expression] =
+    IF ~ "(" ~ expression ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+      case _ ~ _ ~ condition ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.IF, condition, ifTrue, ifFalse)
+    }
+
+  lazy val prefixFunctionCall: PackratParser[Expression] =
+    functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+      case name ~ _ ~ args ~ _ =>
+        lookupCall(name, args: _*)
+    }
+
+  lazy val prefixFunctionCallOneArg: PackratParser[Expression] =
+    functionIdent ~ "(" ~ expression ~ ")" ^^ {
+      case name ~ _ ~ arg ~ _ =>
+        lookupCall(name, arg)
+    }
+
+  lazy val prefixTrim: PackratParser[Expression] =
+    TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+      case _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ ~ operand ~ _ =>
+        unresolvedCall(
+          BuiltInFunctionDefinitions.TRIM,
+          valueLiteral(mode == TRIM_MODE_LEADING.key || mode == TRIM_MODE_BOTH.key),
+          valueLiteral(mode == TRIM_MODE_TRAILING.key || mode == TRIM_MODE_BOTH.key),
+          trimCharacter,
+          operand)
+    }
+
+  lazy val prefixTrimWithoutArgs: PackratParser[Expression] =
+    TRIM ~ "(" ~ expression ~ ")" ^^ {
+      case _ ~ _ ~ operand ~ _ =>
+        unresolvedCall(
+          BuiltInFunctionDefinitions.TRIM,
+          valueLiteral(true),
+          valueLiteral(true),
+          valueLiteral(" "),
+          operand)
+    }
+
+  lazy val prefixExtract: PackratParser[Expression] =
+    EXTRACT ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+      case _ ~ _ ~ operand ~ _ ~ unit ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.EXTRACT, unit, operand)
+      }
+
+  lazy val prefixTimestampDiff: PackratParser[Expression] =
+    TIMESTAMP_DIFF ~ "(" ~ timePointUnit ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+      case _ ~ _ ~ unit ~ _ ~ operand1 ~ _ ~ operand2 ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.TIMESTAMP_DIFF, unit, operand1, operand2)
+    }
+
+  lazy val prefixFloor: PackratParser[Expression] =
+    FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+      case _ ~ _ ~ operand ~ _ ~ unit ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.FLOOR, unit, operand)
+    }
+
+  lazy val prefixCeil: PackratParser[Expression] =
+    CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+      case _ ~ _ ~ operand ~ _ ~ unit ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.CEIL, unit, operand)
+    }
+
+  lazy val prefixGet: PackratParser[Expression] =
+    GET ~ "(" ~ composite ~ ","  ~ literalExpr ~ ")" ^^ {
+      case _ ~ _ ~ e ~ _ ~ index ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.GET, e, index)
+    }
+
+  lazy val prefixFlattening: PackratParser[Expression] =
+    FLATTEN ~ "(" ~> composite <~ ")" ^^ { e =>
+      unresolvedCall(BuiltInFunctionDefinitions.FLATTEN, e)
+    }
+
+  lazy val prefixToDate: PackratParser[Expression] =
+    TO_DATE ~ "(" ~> expression <~ ")" ^^ { e =>
+      unresolvedCall(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE)))
+    }
+
+  lazy val prefixToTimestamp: PackratParser[Expression] =
+    TO_TIMESTAMP ~ "(" ~> expression <~ ")" ^^ { e =>
+      unresolvedCall(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP)))
+    }
+
+  lazy val prefixToTime: PackratParser[Expression] =
+    TO_TIME ~ "(" ~> expression <~ ")" ^^ { e =>
+      unresolvedCall(
+        BuiltInFunctionDefinitions.CAST,
+        e,
+        typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME)))
+    }
+
+  lazy val prefixDistinct: PackratParser[Expression] =
+    functionIdent ~ "." ~ DISTINCT ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+      case name ~ _ ~ _ ~ _ ~ args ~ _ =>
+        unresolvedCall(BuiltInFunctionDefinitions.DISTINCT, lookupCall(name, args: _*))
+    }
+
+  lazy val prefixAs: PackratParser[Expression] =
+    AS ~ "(" ~ expression ~ "," ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+      case _ ~ _ ~ e ~ _ ~ names ~ _ =>
+        unresolvedCall(
+          BuiltInFunctionDefinitions.AS,
+          e :: names.map(n => valueLiteral(n.getName)): _*)
+    }
+
+  lazy val prefixed: PackratParser[Expression] =
+    // expressions that need to be resolved early
+    prefixFlattening |
+    // expressions that need special expression conversion
+    prefixAs| prefixToTimestamp | prefixToTime | prefixToDate |
+    // expressions that take enumerations
+    prefixCast | prefixTrim | prefixTrimWithoutArgs | prefixExtract | prefixFloor | prefixCeil |
+      prefixTimestampDiff |
+    // expressions that take literals
+    prefixGet |
+    // expression with special identifier
+    prefixIf |
+    // expression with prefix distinct
+    prefixDistinct |
+    // function call must always be at the end
+    prefixFunctionCall | prefixFunctionCallOneArg
+
+  // suffix/prefix composite
+
+  lazy val composite: PackratParser[Expression] =
+    over | suffixed | nullLiteral | prefixed | atom |
+    failure("Composite expression expected.")
+
+  // unary ops
+
+  lazy val unaryNot: PackratParser[Expression] = "!" ~> composite ^^ { e =>
+    unresolvedCall(BuiltInFunctionDefinitions.NOT, e)
+  }
+
+  lazy val unaryMinus: PackratParser[Expression] = "-" ~> composite ^^ { e =>
+    unresolvedCall(BuiltInFunctionDefinitions.MINUS_PREFIX, e)
+  }
+
+  lazy val unaryPlus: PackratParser[Expression] = "+" ~> composite ^^ { e => e }
+
+  lazy val unary: PackratParser[Expression] = composite | unaryNot | unaryMinus | unaryPlus |
+    failure("Unary expression expected.")
+
+  // arithmetic
+
+  lazy val product: PackratParser[Expression] = unary * (
+  "*" ^^^ {
+    (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.TIMES, a, b)
+  } | "/" ^^^ {
+    (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.DIVIDE, a, b)
+  } | "%" ^^^ {
+    (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.MOD, a, b)
+  }) | failure("Product expected.")
+
+  lazy val term: PackratParser[Expression] = product * (
+    "+" ^^^ {
+      (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.PLUS, a, b)
+    } | "-" ^^^ {
+      (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.MINUS, a, b)
+    }) | failure("Term expected.")
+
+  // comparison
+
+  lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "==" | "=") ~ term ^^ {
+    case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.EQUALS, l, r)
+  }
+
+  lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | "<>") ~ term ^^ {
+    case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.NOT_EQUALS, l, r)
+  }
+
+  lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
+    case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.GREATER_THAN, l, r)
+  }
+
+  lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ {
+    case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, l, r)
+  }
+
+  lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
+    case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.LESS_THAN, l, r)
+  }
+
+  lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
+    case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, l, r)
+  }
+
+  lazy val comparison: PackratParser[Expression] =
+    equalTo | notEqualTo |
+    greaterThan | greaterThanOrEqual |
+    lessThan | lessThanOrEqual | term |
+    failure("Comparison expected.")
+
+  // logic
+
+  lazy val logic: PackratParser[Expression] = comparison * (
+    "&&" ^^^ {
+      (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.AND, a, b)
+    } | "||" ^^^ {
+      (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.OR, a, b)
+    }) | failure("Logic expected.")
+
+  // time indicators
+
+  lazy val timeIndicator: PackratParser[Expression] = proctime | rowtime
+
+  lazy val proctime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ {
+    case f ~ _ ~ _ => unresolvedCall(BuiltInFunctionDefinitions.PROCTIME, f)
+  }
+
+  lazy val rowtime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ {
+    case f ~ _ ~ _ => unresolvedCall(BuiltInFunctionDefinitions.ROWTIME, f)
+  }
+
+  // alias
+
+  lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
+      case e ~ _ ~ name =>
+        unresolvedCall(BuiltInFunctionDefinitions.AS, e, valueLiteral(name.getName))
+  } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+    case e ~ _ ~ _ ~ names ~ _ =>
+      unresolvedCall(
+        BuiltInFunctionDefinitions.AS,
+        e :: names.map(n => valueLiteral(n.getName)): _*)
+  } | logic
+
+  lazy val aliasMapping: PackratParser[Expression] =
+    fieldReference ~ AS ~ fieldReference ^^ {
+      case e ~ _ ~ name =>
+        unresolvedCall(BuiltInFunctionDefinitions.AS, e, valueLiteral(name.getName))
+  }
+
+  // columns
+
+  lazy val fieldNameRange: PackratParser[Expression] = fieldReference ~ TO ~ fieldReference ^^ {
+    case start ~ _ ~ end => unresolvedCall(BuiltInFunctionDefinitions.RANGE_TO, start, end)
+  }
+
+  lazy val fieldIndexRange: PackratParser[Expression] = numberLiteral ~ TO ~ numberLiteral ^^ {
+    case start ~ _ ~ end => unresolvedCall(BuiltInFunctionDefinitions.RANGE_TO, start, end)
+  }
+
+  lazy val range = fieldNameRange | fieldIndexRange
+
+  lazy val expression: PackratParser[Expression] = range | overConstant | alias |
+    failure("Invalid expression.")
+
+  lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
+
+  def parseExpressionList(expression: String): JList[Expression] = {
+    parseAll(expressionList, expression) match {
+      case Success(lst, _) => lst
+
+      case NoSuccess(msg, next) =>
+        throwError(msg, next)
+    }
+  }
+
+  def parseExpression(exprString: String): Expression = {
+    parseAll(expression, exprString) match {
+      case Success(lst, _) => lst
+
+      case NoSuccess(msg, next) =>
+        throwError(msg, next)
+    }
+  }
+
+  private def throwError(msg: String, next: Input): Nothing = {
+    val improvedMsg = msg.replace("string matching regex `\\z'", "End of expression")
+
+    throw ExpressionParserException(
+      s"""Could not parse expression at column ${next.pos.column}: $improvedMsg
+        |${next.pos.longString}""".stripMargin)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
new file mode 100644
index 0000000..7f7397f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+object PlannerExpressionUtils {
+
+  private[flink] def isTimeIntervalLiteral(expr: PlannerExpression): Boolean = expr match {
+    case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
+    case _ => false
+  }
+
+  private[flink] def isRowCountLiteral(expr: PlannerExpression): Boolean = expr match {
+    case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) => true
+    case _ => false
+  }
+
+  private[flink] def isTimeAttribute(expr: PlannerExpression): Boolean = expr match {
+    case r: ResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
+      true
+    case _ => false
+  }
+
+  private[flink] def isRowtimeAttribute(expr: PlannerExpression): Boolean = expr match {
+    case r: ResolvedFieldReference
+      if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) =>
+      true
+    case _ => false
+  }
+
+  private[flink] def isProctimeAttribute(expr: PlannerExpression): Boolean = expr match {
+    case r: ResolvedFieldReference
+      if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) =>
+      true
+    case _ => false
+  }
+
+  private[flink] def toTime(expr: PlannerExpression): FlinkTime = expr match {
+    case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+      FlinkTime.milliseconds(value)
+    case _ => throw new IllegalArgumentException()
+  }
+
+  private[flink] def toLong(expr: PlannerExpression): Long = expr match {
+    case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) => value
+    case _ => throw new IllegalArgumentException()
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
new file mode 100644
index 0000000..09174dc
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.MultisetTypeInfo
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.functions.utils.AggSqlFunction
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.functions.{AggregateFunction, FunctionRequirement, TableAggregateFunction, UserDefinedAggregateFunction}
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.fun.SqlSumAggFunction
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.tools.RelBuilder.AggCall
+
+abstract sealed class Aggregation extends PlannerExpression {
+
+  override def toString = s"Aggregate"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException("Aggregate cannot be transformed to RexNode")
+
+  /**
+    * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
+    */
+  private[flink] def toAggCall(
+      name: String,
+      isDistinct: Boolean = false
+  )(implicit relBuilder: RelBuilder): AggCall
+
+  /**
+    * Returns the SqlAggFunction for this Aggregation.
+    */
+  private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder): SqlAggFunction
+
+}
+
+case class DistinctAgg(child: PlannerExpression) extends Aggregation {
+
+  def distinct: PlannerExpression = DistinctAgg(child)
+
+  override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+  override private[flink] def validateInput(): ValidationResult = {
+    super.validateInput()
+    child match {
+      case agg: Aggregation =>
+        child.validateInput()
+      case _ =>
+        ValidationFailure(s"Distinct modifier cannot be applied to $child! " +
+          s"It can only be applied to an aggregation expression, for example, " +
+          s"'a.count.distinct which is equivalent with COUNT(DISTINCT a).")
+    }
+  }
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = true)(implicit relBuilder: RelBuilder) = {
+    child.asInstanceOf[Aggregation].toAggCall(name, isDistinct = true)
+  }
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+    child.asInstanceOf[Aggregation].getSqlAggFunction()
+  }
+
+  override private[flink] def children = Seq(child)
+}
+
+case class Sum(child: PlannerExpression) extends Aggregation {
+  override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+  override def toString = s"sum($child)"
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      FlinkSqlOperatorTable.SUM,
+      isDistinct,
+      false,
+      null,
+      name,
+      child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "sum")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+    val returnType = relBuilder
+      .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+      .createFieldTypeFromLogicalType(fromTypeInfoToLogicalType(resultType))
+    new SqlSumAggFunction(returnType)
+  }
+}
+
+case class Sum0(child: PlannerExpression) extends Aggregation {
+  override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+  override def toString = s"sum0($child)"
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      FlinkSqlOperatorTable.SUM0,
+      isDistinct,
+      false,
+      null,
+      name,
+      child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "sum0")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) =
+    FlinkSqlOperatorTable.SUM0
+}
+
+case class Min(child: PlannerExpression) extends Aggregation {
+  override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+  override def toString = s"min($child)"
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      FlinkSqlOperatorTable.MIN,
+      isDistinct,
+      false,
+      null,
+      name,
+      child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeInfoCheckUtils.assertOrderableExpr(child.resultType, "min")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+    FlinkSqlOperatorTable.MIN
+  }
+}
+
+case class Max(child: PlannerExpression) extends Aggregation {
+  override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+  override def toString = s"max($child)"
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      FlinkSqlOperatorTable.MAX,
+      isDistinct,
+      false,
+      null,
+      name,
+      child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeInfoCheckUtils.assertOrderableExpr(child.resultType, "max")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+    FlinkSqlOperatorTable.MAX
+  }
+}
+
+case class Count(child: PlannerExpression) extends Aggregation {
+  override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+  override def toString = s"count($child)"
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      FlinkSqlOperatorTable.COUNT,
+      isDistinct,
+      false,
+      null,
+      name,
+      child.toRexNode)
+  }
+
+  override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+    FlinkSqlOperatorTable.COUNT
+  }
+}
+
+case class Avg(child: PlannerExpression) extends Aggregation {
+  override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+  override def toString = s"avg($child)"
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      FlinkSqlOperatorTable.AVG,
+      isDistinct,
+      false,
+      null,
+      name,
+      child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "avg")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+    FlinkSqlOperatorTable.AVG
+  }
+}
+
+/**
+  * Returns a multiset aggregates.
+  */
+case class Collect(child: PlannerExpression) extends Aggregation  {
+
+  override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+
+  override private[flink] def resultType: TypeInformation[_] =
+    MultisetTypeInfo.getInfoFor(child.resultType)
+
+  override def toString: String = s"collect($child)"
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      FlinkSqlOperatorTable.COLLECT,
+      isDistinct,
+      false,
+      null,
+      name,
+      child.toRexNode)
+  }
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+    FlinkSqlOperatorTable.COLLECT
+  }
+}
+
+case class StddevPop(child: PlannerExpression) extends Aggregation {
+  override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+  override def toString = s"stddev_pop($child)"
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      FlinkSqlOperatorTable.STDDEV_POP,
+      isDistinct,
+      false,
+      null,
+      name,
+      child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "stddev_pop")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) =
+    FlinkSqlOperatorTable.STDDEV_POP
+}
+
+case class StddevSamp(child: PlannerExpression) extends Aggregation {
+  override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+  override def toString = s"stddev_samp($child)"
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      FlinkSqlOperatorTable.STDDEV_SAMP,
+      isDistinct,
+      false,
+      null,
+      name,
+      child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "stddev_samp")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) =
+    FlinkSqlOperatorTable.STDDEV_SAMP
+}
+
+case class VarPop(child: PlannerExpression) extends Aggregation {
+  override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+  override def toString = s"var_pop($child)"
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      FlinkSqlOperatorTable.VAR_POP,
+      isDistinct,
+      false,
+      null,
+      name,
+      child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "var_pop")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) =
+    FlinkSqlOperatorTable.VAR_POP
+}
+
+case class VarSamp(child: PlannerExpression) extends Aggregation {
+  override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+  override def toString = s"var_samp($child)"
+
+  override private[flink] def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      FlinkSqlOperatorTable.VAR_SAMP,
+      isDistinct,
+      false,
+      null,
+      name,
+      child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "var_samp")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) =
+    FlinkSqlOperatorTable.VAR_SAMP
+}
+
+/**
+  * Expression for calling a user-defined aggregate function.
+  */
+case class AggFunctionCall(
+    val aggregateFunction: UserDefinedAggregateFunction[_, _],
+    resultTypeInfo: TypeInformation[_],
+    accTypeInfo: TypeInformation[_],
+    args: Seq[PlannerExpression])
+  extends Aggregation {
+
+  if (aggregateFunction.isInstanceOf[TableAggregateFunction[_, _]]) {
+    throw new UnsupportedOperationException("TableAggregateFunction is unsupported now.")
+  }
+
+  private val aggFunction = aggregateFunction.asInstanceOf[AggregateFunction[_, _]]
+
+  override private[flink] def children: Seq[PlannerExpression] = args
+
+  override def resultType: TypeInformation[_] = resultTypeInfo
+
+  override def validateInput(): ValidationResult = {
+    val signature = children.map(_.resultType)
+    // look for a signature that matches the input types
+    val foundSignature = getAccumulateMethodSignature(
+      aggFunction,
+      signature.map(fromTypeInfoToLogicalType))
+    if (foundSignature.isEmpty) {
+      ValidationFailure(s"Given parameters do not match any signature. \n" +
+                          s"Actual: ${
+                            signatureToString(signature.map(fromLegacyInfoToDataType))} \n" +
+                          s"Expected: ${
+                            getMethodSignatures(aggregateFunction, "accumulate")
+                              .map(_.drop(1))
+                              .map(signatureToString)
+                              .sorted  // make sure order to verify error messages in tests
+                              .mkString(", ")}")
+    } else {
+      ValidationSuccess
+    }
+  }
+
+  override def toString: String = s"${aggregateFunction.getClass.getSimpleName}($args)"
+
+  override def toAggCall(
+      name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(
+      this.getSqlAggFunction(),
+      isDistinct,
+      false,
+      null,
+      name,
+      args.map(_.toRexNode): _*)
+  }
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+    val requiresOver = aggregateFunction match {
+      case a: AggregateFunction[_, _] =>
+        a.getRequirements.contains(FunctionRequirement.OVER_WINDOW_ONLY)
+      case _ => false
+    }
+
+    AggSqlFunction(
+      aggregateFunction.functionIdentifier,
+      aggregateFunction.toString,
+      aggFunction,
+      fromLegacyInfoToDataType(resultType),
+      fromLegacyInfoToDataType(accTypeInfo),
+      typeFactory,
+      requiresOver)
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(this.getSqlAggFunction(), args.map(_.toRexNode): _*)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
new file mode 100644
index 0000000..49212b4
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.{fromLogicalTypeToTypeInfo, fromTypeInfoToLogicalType}
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils._
+import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+abstract class BinaryArithmetic extends BinaryExpression {
+  private[flink] def sqlOperator: SqlOperator
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(sqlOperator, children.map(_.toRexNode))
+  }
+
+  override private[flink] def resultType: TypeInformation[_] =
+    TypeCoercion.widerTypeOf(
+      fromTypeInfoToLogicalType(left.resultType),
+      fromTypeInfoToLogicalType(right.resultType)) match {
+      case Some(t) => fromLogicalTypeToTypeInfo(t)
+      case None =>
+        throw new RuntimeException("This should never happen.")
+    }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!isNumeric(left.resultType) || !isNumeric(right.resultType)) {
+      ValidationFailure(s"The arithmetic '$this' requires both operands to be numeric, but was " +
+        s"'$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
+    } else {
+      ValidationSuccess
+    }
+  }
+}
+
+case class Plus(left: PlannerExpression, right: PlannerExpression) extends BinaryArithmetic {
+  override def toString = s"($left + $right)"
+
+  private[flink] val sqlOperator = FlinkSqlOperatorTable.PLUS
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    if(isString(left.resultType)) {
+      val castedRight = Cast(right, BasicTypeInfo.STRING_TYPE_INFO)
+      relBuilder.call(FlinkSqlOperatorTable.CONCAT, left.toRexNode, castedRight.toRexNode)
+    } else if(isString(right.resultType)) {
+      val castedLeft = Cast(left, BasicTypeInfo.STRING_TYPE_INFO)
+      relBuilder.call(FlinkSqlOperatorTable.CONCAT, castedLeft.toRexNode, right.toRexNode)
+    } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+      relBuilder.call(FlinkSqlOperatorTable.PLUS, left.toRexNode, right.toRexNode)
+    } else if (isTimeInterval(left.resultType) && isTemporal(right.resultType)) {
+      // Calcite has a bug that can't apply INTERVAL + DATETIME (INTERVAL at left)
+      // we manually switch them here
+      relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, right.toRexNode, left.toRexNode)
+    } else if (isTemporal(left.resultType) && isTemporal(right.resultType)) {
+      relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, left.toRexNode, right.toRexNode)
+    } else {
+      val castedLeft = Cast(left, resultType)
+      val castedRight = Cast(right, resultType)
+      relBuilder.call(FlinkSqlOperatorTable.PLUS, castedLeft.toRexNode, castedRight.toRexNode)
+    }
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (isString(left.resultType) || isString(right.resultType)) {
+      ValidationSuccess
+    } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+      ValidationSuccess
+    } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
+      ValidationSuccess
+    } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
+      ValidationSuccess
+    } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(
+        s"The arithmetic '$this' requires input that is numeric, string, time intervals of the " +
+        s"same type, or a time interval and a time point type, " +
+        s"but was '$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
+    }
+  }
+}
+
+case class UnaryMinus(child: PlannerExpression) extends UnaryExpression {
+  override def toString = s"-($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.UNARY_MINUS, child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (isNumeric(child.resultType)) {
+      ValidationSuccess
+    } else if (isTimeInterval(child.resultType)) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"The arithmetic '$this' requires input that is numeric or a time " +
+        s"interval type, but was '${child.resultType}'.")
+    }
+  }
+}
+
+case class Minus(left: PlannerExpression, right: PlannerExpression) extends BinaryArithmetic {
+  override def toString = s"($left - $right)"
+
+  private[flink] val sqlOperator = FlinkSqlOperatorTable.MINUS
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+      ValidationSuccess
+    } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
+      ValidationSuccess
+    } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
+      ValidationSuccess
+    } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(
+        s"The arithmetic '$this' requires inputs that are numeric, time intervals of the same " +
+        s"type, or a time interval and a time point type, " +
+        s"but was '$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
+    }
+  }
+}
+
+case class Div(left: PlannerExpression, right: PlannerExpression) extends BinaryArithmetic {
+  override def toString = s"($left / $right)"
+
+  private[flink] val sqlOperator = FlinkSqlOperatorTable.DIVIDE
+}
+
+case class Mul(left: PlannerExpression, right: PlannerExpression) extends BinaryArithmetic {
+  override def toString = s"($left * $right)"
+
+  private[flink] val sqlOperator = FlinkSqlOperatorTable.MULTIPLY
+}
+
+case class Mod(left: PlannerExpression, right: PlannerExpression) extends BinaryArithmetic {
+  override def toString = s"($left % $right)"
+
+  private[flink] val sqlOperator = FlinkSqlOperatorTable.MOD
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
new file mode 100644
index 0000000..b9b0b3f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import java.util
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.rex.RexWindowBound._
+import org.apache.calcite.rex.{RexFieldCollation, RexNode, RexWindowBound}
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`.OrdinalReturnTypeInference
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.api._
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions._
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
+import org.apache.flink.table.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.types.logical.LogicalType
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+  * Over call with unresolved alias for over window.
+  *
+  * @param agg The aggregation of the over call.
+  * @param alias The alias of the referenced over window.
+  */
+case class UnresolvedOverCall(agg: PlannerExpression, alias: PlannerExpression)
+  extends PlannerExpression {
+
+  override private[flink] def validateInput() =
+    ValidationFailure(s"Over window with alias $alias could not be resolved.")
+
+  override private[flink] def resultType = agg.resultType
+
+  override private[flink] def children = Seq()
+}
+
+/**
+  * Over expression for Calcite over transform.
+  *
+  * @param agg            over-agg expression
+  * @param partitionBy    The fields by which the over window is partitioned
+  * @param orderBy        The field by which the over window is sorted
+  * @param preceding      The lower bound of the window
+  * @param following      The upper bound of the window
+  */
+case class OverCall(
+    agg: PlannerExpression,
+    partitionBy: Seq[PlannerExpression],
+    orderBy: PlannerExpression,
+    preceding: PlannerExpression,
+    following: PlannerExpression) extends PlannerExpression {
+
+  override def toString: String = s"$agg OVER (" +
+    s"PARTITION BY (${partitionBy.mkString(", ")}) " +
+    s"ORDER BY $orderBy " +
+    s"PRECEDING $preceding " +
+    s"FOLLOWING $following)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+
+    val rexBuilder = relBuilder.getRexBuilder
+
+    // assemble aggregation
+    val operator: SqlAggFunction = agg.asInstanceOf[Aggregation].getSqlAggFunction()
+    val aggResultType = relBuilder
+      .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+      .createFieldTypeFromLogicalType(fromTypeInfoToLogicalType(agg.resultType))
+
+    // assemble exprs by agg children
+    val aggExprs = agg.asInstanceOf[Aggregation].children.map(_.toRexNode(relBuilder)).asJava
+
+    // assemble order by key
+    val orderKey = new RexFieldCollation(orderBy.toRexNode, Set[SqlKind]().asJava)
+    val orderKeys = ImmutableList.of(orderKey)
+
+    // assemble partition by keys
+    val partitionKeys = partitionBy.map(_.toRexNode).asJava
+
+    // assemble bounds
+    val isPhysical: Boolean = preceding.resultType == BasicTypeInfo.LONG_TYPE_INFO
+
+    val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
+    val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
+
+    // build RexOver
+    rexBuilder.makeOver(
+      aggResultType,
+      operator,
+      aggExprs,
+      partitionKeys,
+      orderKeys,
+      lowerBound,
+      upperBound,
+      isPhysical,
+      true,
+      false,
+      false)
+  }
+
+  private def createBound(
+    relBuilder: RelBuilder,
+    bound: PlannerExpression,
+    sqlKind: SqlKind): RexWindowBound = {
+
+    bound match {
+      case _: UnboundedRow | _: UnboundedRange =>
+        val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
+        create(unbounded, null)
+      case _: CurrentRow | _: CurrentRange =>
+        val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
+        create(currentRow, null)
+      case b: Literal =>
+        val returnType = relBuilder
+          .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+          .createFieldTypeFromLogicalType(fromTypeInfoToLogicalType(Types.DECIMAL))
+
+        val sqlOperator = new SqlPostfixOperator(
+          sqlKind.name,
+          sqlKind,
+          2,
+          new OrdinalReturnTypeInference(0),
+          null,
+          null)
+
+        val operands: Array[SqlNode] = new Array[SqlNode](1)
+        operands(0) = SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)
+
+        val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
+
+        val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+        expressions.add(relBuilder.literal(b.value))
+
+        val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions)
+
+        create(node, rexNode)
+    }
+  }
+
+  override private[flink] def children: Seq[PlannerExpression] =
+    Seq(agg) ++ Seq(orderBy) ++ partitionBy ++ Seq(preceding) ++ Seq(following)
+
+  override private[flink] def resultType = agg.resultType
+
+  override private[flink] def validateInput(): ValidationResult = {
+
+    // check that agg expression is aggregation
+    agg match {
+      case _: Aggregation =>
+        ValidationSuccess
+      case _ =>
+        return ValidationFailure(s"OVER can only be applied on an aggregation.")
+    }
+
+    // check partitionBy expression keys are resolved field reference
+    partitionBy.foreach {
+      case r: ResolvedFieldReference if r.resultType.isKeyType  =>
+        ValidationSuccess
+      case r: ResolvedFieldReference =>
+        return ValidationFailure(s"Invalid PartitionBy expression: $r. " +
+          s"Expression must return key type.")
+      case r =>
+        return ValidationFailure(s"Invalid PartitionBy expression: $r. " +
+          s"Expression must be a resolved field reference.")
+    }
+
+    // check preceding is valid
+    preceding match {
+      case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: UnboundedRange =>
+        ValidationSuccess
+      case Literal(v: Long, BasicTypeInfo.LONG_TYPE_INFO) if v > 0 =>
+        ValidationSuccess
+      case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) =>
+        return ValidationFailure("Preceding row interval must be larger than 0.")
+      case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 =>
+        ValidationSuccess
+      case Literal(_, _: TimeIntervalTypeInfo[_]) =>
+        return ValidationFailure("Preceding time interval must be equal or larger than 0.")
+      case Literal(_, _) =>
+        return ValidationFailure("Preceding must be a row interval or time interval literal.")
+    }
+
+    // check following is valid
+    following match {
+      case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: UnboundedRange =>
+        ValidationSuccess
+      case Literal(v: Long, BasicTypeInfo.LONG_TYPE_INFO) if v > 0 =>
+        ValidationSuccess
+      case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) =>
+        return ValidationFailure("Following row interval must be larger than 0.")
+      case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 =>
+        ValidationSuccess
+      case Literal(_, _: TimeIntervalTypeInfo[_]) =>
+        return ValidationFailure("Following time interval must be equal or larger than 0.")
+      case Literal(_, _) =>
+        return ValidationFailure("Following must be a row interval or time interval literal.")
+    }
+
+    // check that preceding and following are of same type
+    (preceding, following) match {
+      case (p: PlannerExpression, f: PlannerExpression) if p.resultType == f.resultType =>
+        ValidationSuccess
+      case _ =>
+        return ValidationFailure("Preceding and following must be of same interval type.")
+    }
+
+    // check time field
+    if (!PlannerExpressionUtils.isTimeAttribute(orderBy)) {
+      return ValidationFailure("Ordering must be defined on a time attribute.")
+    }
+
+    ValidationSuccess
+  }
+}
+
+/**
+  * Expression for calling a user-defined scalar functions.
+  *
+  * @param scalarFunction scalar function to be called (might be overloaded)
+  * @param parameters actual parameters that determine target evaluation method
+  */
+case class PlannerScalarFunctionCall(
+    scalarFunction: ScalarFunction,
+    parameters: Seq[PlannerExpression])
+  extends PlannerExpression {
+
+  private var signature: Array[LogicalType] = _
+
+  override private[flink] def children: Seq[PlannerExpression] = parameters
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    relBuilder.call(
+      createScalarSqlFunction(
+        scalarFunction.functionIdentifier,
+        scalarFunction.toString,
+        scalarFunction,
+        typeFactory),
+      parameters.map(_.toRexNode): _*)
+  }
+
+  override def toString =
+    s"${scalarFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
+
+  override private[flink] def resultType =
+    fromDataTypeToTypeInfo(getResultTypeOfScalarFunction(
+      scalarFunction,
+      Array(),
+      signature))
+
+  override private[flink] def validateInput(): ValidationResult = {
+    signature = children.map(_.resultType).map(fromTypeInfoToLogicalType).toArray
+    // look for a signature that matches the input types
+    val foundSignature = getEvalMethodSignatureOption(scalarFunction, signature)
+    if (foundSignature.isEmpty) {
+      ValidationFailure(s"Given parameters do not match any signature. \n" +
+        s"Actual: ${signatureToString(signature.map(fromLogicalTypeToDataType))} \n" +
+        s"Expected: ${signaturesToString(scalarFunction, "eval")}")
+    } else {
+      ValidationSuccess
+    }
+  }
+}
+
+/**
+  *
+  * Expression for calling a user-defined table function with actual parameters.
+  *
+  * @param functionName function name
+  * @param tableFunction user-defined table function
+  * @param parameters actual parameters of function
+  * @param resultType type information of returned table
+  */
+case class PlannerTableFunctionCall(
+    functionName: String,
+    tableFunction: TableFunction[_],
+    parameters: Seq[PlannerExpression],
+    resultType: TypeInformation[_])
+  extends PlannerExpression {
+
+  override private[flink] def children: Seq[PlannerExpression] = parameters
+
+  override def validateInput(): ValidationResult = {
+    // check if not Scala object
+    UserFunctionsTypeHelper.validateNotSingleton(tableFunction.getClass)
+    // check if class could be instantiated
+    UserFunctionsTypeHelper.validateInstantiation(tableFunction.getClass)
+    // look for a signature that matches the input types
+    val signature = parameters.map(_.resultType).map(fromLegacyInfoToDataType)
+    val foundMethod = getUserDefinedMethod(
+      tableFunction, "eval", signature)
+    if (foundMethod.isEmpty) {
+      ValidationFailure(
+        s"Given parameters of function '$functionName' do not match any signature. \n" +
+          s"Actual: ${signatureToString(signature)} \n" +
+          s"Expected: ${signaturesToString(tableFunction, "eval")}")
+    } else {
+      ValidationSuccess
+    }
+  }
+
+  override def toString =
+    s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/cast.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/cast.scala
new file mode 100644
index 0000000..98f4b2e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/cast.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.validate._
+
+case class Cast(child: PlannerExpression, resultType: TypeInformation[_])
+  extends UnaryExpression {
+
+  override def toString = s"$child.cast($resultType)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    val childRexNode = child.toRexNode
+    relBuilder
+      .getRexBuilder
+      // we use abstract cast here because RelBuilder.cast() has to many side effects
+      .makeAbstractCast(
+        typeFactory.createFieldTypeFromLogicalType(
+          fromTypeInfoToLogicalType(resultType).copy(childRexNode.getType.isNullable)),
+        childRexNode)
+  }
+
+  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+    val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
+    copy(child, resultType).asInstanceOf[this.type]
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (TypeCoercion.canCast(
+      fromTypeInfoToLogicalType(child.resultType),
+      fromTypeInfoToLogicalType(resultType))) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Unsupported cast from ${child.resultType} to $resultType")
+    }
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/collection.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/collection.scala
new file mode 100644
index 0000000..dcafe7e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/collection.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils.{isArray, isMap}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class RowConstructor(elements: Seq[PlannerExpression]) extends PlannerExpression {
+
+  override private[flink] def children: Seq[PlannerExpression] = elements
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val relDataType = relBuilder
+      .asInstanceOf[FlinkRelBuilder]
+      .getTypeFactory
+      .createFieldTypeFromLogicalType(fromTypeInfoToLogicalType(resultType).copy(false))
+    val values = elements.map(_.toRexNode).toList.asJava
+    relBuilder
+      .getRexBuilder
+      .makeCall(relDataType, FlinkSqlOperatorTable.ROW, values)
+  }
+
+  override def toString = s"row(${elements.mkString(", ")})"
+
+  override private[flink] def resultType: TypeInformation[_] = new RowTypeInfo(
+    elements.map(e => e.resultType):_*
+  )
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (elements.isEmpty) {
+      return ValidationFailure("Empty rows are not supported yet.")
+    }
+    ValidationSuccess
+  }
+}
+
+case class ArrayConstructor(elements: Seq[PlannerExpression]) extends PlannerExpression {
+
+  override private[flink] def children: Seq[PlannerExpression] = elements
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val relDataType = relBuilder
+      .asInstanceOf[FlinkRelBuilder]
+      .getTypeFactory
+      .createFieldTypeFromLogicalType(fromTypeInfoToLogicalType(resultType).copy(false))
+    val values = elements.map(_.toRexNode).toList.asJava
+    relBuilder
+      .getRexBuilder
+      .makeCall(relDataType, FlinkSqlOperatorTable.ARRAY_VALUE_CONSTRUCTOR, values)
+  }
+
+  override def toString = s"array(${elements.mkString(", ")})"
+
+  override private[flink] def resultType = ObjectArrayTypeInfo.getInfoFor(elements.head.resultType)
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (elements.isEmpty) {
+      return ValidationFailure("Empty arrays are not supported yet.")
+    }
+    val elementType = elements.head.resultType
+    if (!elements.forall(_.resultType == elementType)) {
+      ValidationFailure("Not all elements of the array have the same type.")
+    } else {
+      ValidationSuccess
+    }
+  }
+}
+
+case class MapConstructor(elements: Seq[PlannerExpression]) extends PlannerExpression {
+  override private[flink] def children: Seq[PlannerExpression] = elements
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+    val relDataType = typeFactory.createMapType(
+      typeFactory.createFieldTypeFromLogicalType(
+        fromTypeInfoToLogicalType(elements.head.resultType)),
+      typeFactory.createFieldTypeFromLogicalType(
+        fromTypeInfoToLogicalType(elements.last.resultType))
+    )
+    val values = elements.map(_.toRexNode).toList.asJava
+    relBuilder
+      .getRexBuilder
+      .makeCall(relDataType, FlinkSqlOperatorTable.MAP_VALUE_CONSTRUCTOR, values)
+  }
+
+  override def toString = s"map(${elements
+    .grouped(2)
+    .map(x => s"[${x.mkString(": ")}]").mkString(", ")})"
+
+  override private[flink] def resultType: TypeInformation[_] = new MapTypeInfo(
+    elements.head.resultType,
+    elements.last.resultType
+  )
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (elements.isEmpty) {
+      return ValidationFailure("Empty maps are not supported yet.")
+    }
+    if (elements.size % 2 != 0) {
+      return ValidationFailure("Maps must have an even number of elements to form key-value pairs.")
+    }
+    if (!elements.grouped(2).forall(_.head.resultType == elements.head.resultType)) {
+      return ValidationFailure("Not all key elements of the map literal have the same type.")
+    }
+    if (!elements.grouped(2).forall(_.last.resultType == elements.last.resultType)) {
+      return ValidationFailure("Not all value elements of the map literal have the same type.")
+    }
+    ValidationSuccess
+  }
+}
+
+case class ArrayElement(array: PlannerExpression) extends PlannerExpression {
+
+  override private[flink] def children: Seq[PlannerExpression] = Seq(array)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(FlinkSqlOperatorTable.ELEMENT, array.toRexNode)
+  }
+
+  override def toString = s"($array).element()"
+
+  override private[flink] def resultType = array.resultType match {
+    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+    case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
+    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    array.resultType match {
+      case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
+      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+    }
+  }
+}
+
+case class Cardinality(container: PlannerExpression) extends PlannerExpression {
+
+  override private[flink] def children: Seq[PlannerExpression] = Seq(container)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(FlinkSqlOperatorTable.CARDINALITY, container.toRexNode)
+  }
+
+  override def toString = s"($container).cardinality()"
+
+  override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    container.resultType match {
+      case mti: TypeInformation[_] if isMap(mti) => ValidationSuccess
+      case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
+      case other@_ => ValidationFailure(s"Array or map expected but was '$other'.")
+    }
+  }
+}
+
+case class ItemAt(container: PlannerExpression, key: PlannerExpression) extends PlannerExpression {
+
+  override private[flink] def children: Seq[PlannerExpression] = Seq(container, key)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(FlinkSqlOperatorTable.ITEM, container.toRexNode, key.toRexNode)
+  }
+
+  override def toString = s"($container).at($key)"
+
+  override private[flink] def resultType = container.resultType match {
+    case mti: MapTypeInfo[_, _] => mti.getValueTypeInfo
+    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+    case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
+    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    container.resultType match {
+
+      case ati: TypeInformation[_] if isArray(ati)  =>
+        if (key.resultType == INT_TYPE_INFO) {
+          // check for common user mistake
+          key match {
+            case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
+              ValidationFailure(
+                s"Array element access needs an index starting at 1 but was $value.")
+            case _ => ValidationSuccess
+          }
+        } else {
+          ValidationFailure(
+            s"Array element access needs an integer index but was '${key.resultType}'.")
+        }
+
+      case mti: MapTypeInfo[_, _]  =>
+        if (key.resultType == mti.getKeyTypeInfo) {
+          ValidationSuccess
+        } else {
+          ValidationFailure(
+            s"Map entry access needs a valid key of type " +
+              s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.")
+        }
+
+      case other@_ => ValidationFailure(s"Array or map expected but was '$other'.")
+    }
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/comparison.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/comparison.scala
new file mode 100644
index 0000000..c1517cc
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/comparison.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils.{isArray, isComparable, isNumeric}
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+abstract class BinaryComparison extends BinaryExpression {
+  private[flink] def sqlOperator: SqlOperator
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(sqlOperator, children.map(_.toRexNode))
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    (left.resultType, right.resultType) match {
+      case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+      case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess
+      case (lType, rType) =>
+        ValidationFailure(
+          s"Comparison is only supported for numeric types and " +
+            s"comparable types of same type, got $lType and $rType")
+    }
+}
+
+case class EqualTo(left: PlannerExpression, right: PlannerExpression) extends BinaryComparison {
+  override def toString = s"$left === $right"
+
+  private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.EQUALS
+
+  override private[flink] def validateInput(): ValidationResult =
+    (left.resultType, right.resultType) match {
+      case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+      case (lType, rType) if lType == rType => ValidationSuccess
+      case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
+        ValidationSuccess
+      case (lType, rType) =>
+        ValidationFailure(s"Equality predicate on incompatible types: $lType and $rType")
+    }
+}
+
+case class NotEqualTo(left: PlannerExpression, right: PlannerExpression) extends BinaryComparison {
+  override def toString = s"$left !== $right"
+
+  private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.NOT_EQUALS
+
+  override private[flink] def validateInput(): ValidationResult =
+    (left.resultType, right.resultType) match {
+      case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+      case (lType, rType) if lType == rType => ValidationSuccess
+      case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
+        ValidationSuccess
+      case (lType, rType) =>
+        ValidationFailure(s"Inequality predicate on incompatible types: $lType and $rType")
+    }
+}
+
+case class GreaterThan(left: PlannerExpression, right: PlannerExpression) extends BinaryComparison {
+  override def toString = s"$left > $right"
+
+  private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.GREATER_THAN
+}
+
+case class GreaterThanOrEqual(left: PlannerExpression, right: PlannerExpression)
+  extends BinaryComparison {
+  override def toString = s"$left >= $right"
+
+  private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL
+}
+
+case class LessThan(left: PlannerExpression, right: PlannerExpression) extends BinaryComparison {
+  override def toString = s"$left < $right"
+
+  private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.LESS_THAN
+}
+
+case class LessThanOrEqual(left: PlannerExpression, right: PlannerExpression)
+  extends BinaryComparison {
+  override def toString = s"$left <= $right"
+
+  private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL
+}
+
+case class IsNull(child: PlannerExpression) extends UnaryExpression {
+  override def toString = s"($child).isNull"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.isNull(child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotNull(child: PlannerExpression) extends UnaryExpression {
+  override def toString = s"($child).isNotNull"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.isNotNull(child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsTrue(child: PlannerExpression) extends UnaryExpression {
+  override def toString = s"($child).isTrue"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.IS_TRUE, child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsFalse(child: PlannerExpression) extends UnaryExpression {
+  override def toString = s"($child).isFalse"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.IS_FALSE, child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotTrue(child: PlannerExpression) extends UnaryExpression {
+  override def toString = s"($child).isNotTrue"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.IS_NOT_TRUE, child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotFalse(child: PlannerExpression) extends UnaryExpression {
+  override def toString = s"($child).isNotFalse"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.IS_NOT_FALSE, child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+abstract class BetweenComparison(
+    expr: PlannerExpression,
+    lowerBound: PlannerExpression,
+    upperBound: PlannerExpression)
+  extends PlannerExpression {
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+  override private[flink] def children: Seq[PlannerExpression] = Seq(expr, lowerBound, upperBound)
+
+  override private[flink] def validateInput(): ValidationResult = {
+    (expr.resultType, lowerBound.resultType, upperBound.resultType) match {
+      case (exprType, lowerType, upperType)
+          if isNumeric(exprType) && isNumeric(lowerType) && isNumeric(upperType) =>
+        ValidationSuccess
+      case (exprType, lowerType, upperType)
+          if isComparable(exprType) && exprType == lowerType && exprType == upperType =>
+        ValidationSuccess
+      case (exprType, lowerType, upperType) =>
+        ValidationFailure(
+          s"Between is only supported for numeric types and " +
+            s"identical comparable types, but got $exprType, $lowerType and $upperType"
+        )
+    }
+  }
+}
+
+case class Between(
+    expr: PlannerExpression,
+    lowerBound: PlannerExpression,
+    upperBound: PlannerExpression)
+  extends BetweenComparison(expr, lowerBound, upperBound) {
+
+  override def toString: String = s"($expr).between($lowerBound, $upperBound)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.and(
+      relBuilder.call(
+        FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL,
+        expr.toRexNode,
+        lowerBound.toRexNode
+      ),
+      relBuilder.call(
+        FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL,
+        expr.toRexNode,
+        upperBound.toRexNode
+      )
+    )
+  }
+}
+
+case class NotBetween(
+    expr: PlannerExpression,
+    lowerBound: PlannerExpression,
+    upperBound: PlannerExpression)
+  extends BetweenComparison(expr, lowerBound, upperBound) {
+
+  override def toString: String = s"($expr).notBetween($lowerBound, $upperBound)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.or(
+      relBuilder.call(
+        FlinkSqlOperatorTable.LESS_THAN,
+        expr.toRexNode,
+        lowerBound.toRexNode
+      ),
+      relBuilder.call(
+        FlinkSqlOperatorTable.GREATER_THAN,
+        expr.toRexNode,
+        upperBound.toRexNode
+      )
+    )
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala
new file mode 100644
index 0000000..1f858a1
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.UnresolvedException
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+/**
+  * Flattening of composite types. All flattenings are resolved into
+  * `GetCompositeField` expressions.
+  */
+case class Flattening(child: PlannerExpression) extends UnaryExpression {
+
+  override def toString = s"$child.flatten()"
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw UnresolvedException(s"Invalcall to on ${this.getClass}.")
+
+  override private[flink] def validateInput(): ValidationResult =
+    ValidationFailure(s"Unresolved flattening of $child")
+}
+
+case class GetCompositeField(child: PlannerExpression, key: Any) extends UnaryExpression {
+
+  private var fieldIndex: Option[Int] = None
+
+  override def toString = s"$child.get($key)"
+
+  override private[flink] def validateInput(): ValidationResult = {
+    // check for composite type
+    if (!child.resultType.isInstanceOf[CompositeType[_]]) {
+      return ValidationFailure(s"Cannot access field of non-composite type '${child.resultType}'.")
+    }
+    val compositeType = child.resultType.asInstanceOf[CompositeType[_]]
+
+    // check key
+    key match {
+      case name: String =>
+        val index = compositeType.getFieldIndex(name)
+        if (index < 0) {
+          ValidationFailure(s"Field name '$name' could not be found.")
+        } else {
+          fieldIndex = Some(index)
+          ValidationSuccess
+        }
+      case index: Int =>
+        if (index >= compositeType.getArity) {
+          ValidationFailure(s"Field index '$index' exceeds arity.")
+        } else {
+          fieldIndex = Some(index)
+          ValidationSuccess
+        }
+      case _ =>
+        ValidationFailure(s"Invalid key '$key'.")
+    }
+  }
+
+  override private[flink] def resultType: TypeInformation[_] =
+    child.resultType.asInstanceOf[CompositeType[_]].getTypeAt(fieldIndex.get)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeFieldAccess(child.toRexNode, fieldIndex.get)
+  }
+
+  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+    val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
+    copy(child, key).asInstanceOf[this.type]
+  }
+
+  /**
+    * Gives a meaningful alias if possible (e.g. a$mypojo$field).
+    */
+  private[flink] def aliasName(): Option[String] = child match {
+    case gcf: GetCompositeField =>
+      val alias = gcf.aliasName()
+      if (alias.isDefined) {
+        Some(s"${alias.get}$$$key")
+      } else {
+        None
+      }
+    case c: ResolvedFieldReference =>
+      val keySuffix = if (key.isInstanceOf[Int]) s"_$key" else key
+      Some(s"${c.name}$$$keySuffix")
+    case _ => None
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
new file mode 100644
index 0000000..ffd526e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api._
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.calcite.FlinkTypeFactory._
+import org.apache.flink.table.functions.sql.StreamRecordTimestampSqlFunction
+import org.apache.flink.table.operations.QueryOperation
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+trait NamedExpression extends PlannerExpression {
+  private[flink] def name: String
+  private[flink] def toAttribute: Attribute
+}
+
+abstract class Attribute extends LeafExpression with NamedExpression {
+  override private[flink] def toAttribute: Attribute = this
+
+  private[flink] def withName(newName: String): Attribute
+}
+
+/**
+  * Dummy wrapper for expressions that were converted to RexNode in a different way.
+  */
+case class RexPlannerExpression(
+    private[flink] val rexNode: RexNode)
+  extends LeafExpression {
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    rexNode
+  }
+
+  override private[flink] def resultType: TypeInformation[_] =
+    fromLogicalTypeToTypeInfo(FlinkTypeFactory.toLogicalType(rexNode.getType))
+}
+
+case class UnresolvedFieldReference(name: String) extends Attribute {
+
+  override def toString = s"'$name"
+
+  override private[flink] def withName(newName: String): Attribute =
+    UnresolvedFieldReference(newName)
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw UnresolvedException(s"Calling resultType on ${this.getClass}.")
+
+  override private[flink] def validateInput(): ValidationResult =
+    ValidationFailure(s"Unresolved reference $name.")
+}
+
+case class PlannerResolvedFieldReference(
+    name: String,
+    resultType: TypeInformation[_]) extends Attribute with ResolvedFieldReference {
+
+  override def toString = s"'$name"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.field(name)
+  }
+
+  override private[flink] def withName(newName: String): Attribute = {
+    if (newName == name) {
+      this
+    } else {
+      PlannerResolvedFieldReference(newName, resultType)
+    }
+  }
+}
+
+case class Alias(child: PlannerExpression, name: String, extraNames: Seq[String] = Seq())
+    extends UnaryExpression with NamedExpression {
+
+  override def toString = s"$child as '$name"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.alias(child.toRexNode, name)
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+    val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
+    copy(child, name, extraNames).asInstanceOf[this.type]
+  }
+
+  override private[flink] def toAttribute: Attribute = {
+    if (valid) {
+      PlannerResolvedFieldReference(name, child.resultType)
+    } else {
+      UnresolvedFieldReference(name)
+    }
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (name == "*") {
+      ValidationFailure("Alias can not accept '*' as name.")
+    } else {
+      ValidationSuccess
+    }
+  }
+}
+
+case class UnresolvedAlias(child: PlannerExpression) extends UnaryExpression with NamedExpression {
+
+  override private[flink] def name: String =
+    throw UnresolvedException("Invalid call to name on UnresolvedAlias")
+
+  override private[flink] def toAttribute: Attribute =
+    throw UnresolvedException("Invalid call to toAttribute on UnresolvedAlias")
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw UnresolvedException("Invalid call to resultType on UnresolvedAlias")
+
+  override private[flink] lazy val valid = false
+}
+
+case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None) extends Attribute {
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException("A window reference can not be used solely.")
+
+  override private[flink] def resultType: TypeInformation[_] =
+    tpe.getOrElse(throw UnresolvedException("Could not resolve type of referenced window."))
+
+  override private[flink] def withName(newName: String): Attribute = {
+    if (newName == name) {
+      this
+    } else {
+      throw new ValidationException("Cannot rename window reference.")
+    }
+  }
+
+  override def toString: String = s"'$name"
+}
+
+case class TableReference(name: String, tableOperation: QueryOperation)
+  extends LeafExpression
+  with NamedExpression {
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException(s"Table reference '$name' can not be used solely.")
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw UnresolvedException(s"Table reference '$name' has no result type.")
+
+  override private[flink] def toAttribute =
+    throw new UnsupportedOperationException(s"A table reference '$name' can not be an attribute.")
+
+  override def toString: String = s"$name"
+}
+
+abstract class TimeAttribute(val expression: PlannerExpression)
+  extends UnaryExpression
+  with WindowProperty {
+
+  override private[flink] def child: PlannerExpression = expression
+}
+
+case class RowtimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr) {
+
+  override private[flink] def validateInput(): ValidationResult = {
+    child match {
+      case WindowReference(_, Some(tpe: TypeInformation[_])) if isProctimeIndicatorType(tpe) =>
+        ValidationFailure("A proctime window cannot provide a rowtime attribute.")
+      case WindowReference(_, Some(tpe: TypeInformation[_])) if isRowtimeIndicatorType(tpe) =>
+        // rowtime window
+        ValidationSuccess
+      case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP =>
+        // batch time window
+        ValidationSuccess
+      case WindowReference(_, _) =>
+        ValidationFailure("Reference to a rowtime or proctime window required.")
+      case any =>
+        ValidationFailure(
+          s"The '.rowtime' expression can only be used for table definitions and windows, " +
+            s"while [$any] was found.")
+    }
+  }
+
+  override def resultType: TypeInformation[_] = {
+    child match {
+      case WindowReference(_, Some(tpe: TypeInformation[_])) if isRowtimeIndicatorType(tpe) =>
+        // rowtime window
+        TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+      case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP =>
+        // batch time window
+        Types.SQL_TIMESTAMP
+      case _ =>
+        throw new TableException("RowtimeAttribute has invalid type. Please report this bug.")
+    }
+  }
+
+  override def toNamedWindowProperty(name: String): NamedWindowProperty =
+    NamedWindowProperty(name, this)
+
+  override def toString: String = s"rowtime($child)"
+}
+
+case class ProctimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr) {
+
+  override private[flink] def validateInput(): ValidationResult = {
+    child match {
+      case WindowReference(_, Some(tpe: TypeInformation[_])) if isTimeIndicatorType(tpe) =>
+        ValidationSuccess
+      case WindowReference(_, _) =>
+        ValidationFailure("Reference to a rowtime or proctime window required.")
+      case any =>
+        ValidationFailure(
+          "The '.proctime' expression can only be used for table definitions and windows, " +
+            s"while [$any] was found.")
+    }
+  }
+
+  override def resultType: TypeInformation[_] =
+    TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+
+  override def toNamedWindowProperty(name: String): NamedWindowProperty =
+    NamedWindowProperty(name, this)
+
+  override def toString: String = s"proctime($child)"
+}
+
+/** Expression to access the timestamp of a StreamRecord. */
+case class StreamRecordTimestamp() extends LeafExpression {
+
+  override private[flink] def resultType = Types.LONG
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.getRexBuilder.makeCall(new StreamRecordTimestampSqlFunction)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/hashExpressions.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/hashExpressions.scala
new file mode 100644
index 0000000..b24d46c
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/hashExpressions.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+
+case class Md5(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+  override def toString: String = s"($child).md5()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.MD5, child.toRexNode)
+  }
+}
+
+case class Sha1(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+  override def toString: String = s"($child).sha1()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.SHA1, child.toRexNode)
+  }
+}
+
+case class Sha224(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+  override def toString: String = s"($child).sha224()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.SHA224, child.toRexNode)
+  }
+}
+
+case class Sha256(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+  override def toString: String = s"($child).sha256()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.SHA256, child.toRexNode)
+  }
+}
+
+case class Sha384(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+  override def toString: String = s"($child).sha384()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.SHA384, child.toRexNode)
+  }
+}
+
+case class Sha512(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+  override def toString: String = s"($child).sha512()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.SHA512, child.toRexNode)
+  }
+}
+
+case class Sha2(child: PlannerExpression, hashLength: PlannerExpression)
+    extends BinaryExpression with InputTypeSpec {
+
+  override private[flink] def left = child
+  override private[flink] def right = hashLength
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    STRING_TYPE_INFO :: INT_TYPE_INFO :: Nil
+
+  override def toString: String = s"($child).sha2($hashLength)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.SHA2, left.toRexNode, right.toRexNode)
+  }
+
+}
+
+
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/literals.scala
new file mode 100644
index 0000000..b33ca66
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import java.sql.{Date, Time, Timestamp}
+import java.util.{Calendar, TimeZone}
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlIntervalQualifier
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.{DateString, TimeString, TimestampString}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+object Literal {
+  private[flink] val UTC = TimeZone.getTimeZone("UTC")
+
+  private[flink] def apply(l: Any): Literal = l match {
+    case i: Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
+    case s: Short => Literal(s, BasicTypeInfo.SHORT_TYPE_INFO)
+    case b: Byte => Literal(b, BasicTypeInfo.BYTE_TYPE_INFO)
+    case l: Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
+    case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
+    case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
+    case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
+    case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+    case javaDec: java.math.BigDecimal => Literal(javaDec, BasicTypeInfo.BIG_DEC_TYPE_INFO)
+    case scalaDec: scala.math.BigDecimal =>
+      Literal(scalaDec.bigDecimal, BasicTypeInfo.BIG_DEC_TYPE_INFO)
+    case sqlDate: Date => Literal(sqlDate, SqlTimeTypeInfo.DATE)
+    case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
+    case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
+  }
+}
+
+case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpression {
+  override def toString: String = resultType match {
+    case _: BasicTypeInfo[_] => value.toString
+    case _@SqlTimeTypeInfo.DATE => value.toString + ".toDate"
+    case _@SqlTimeTypeInfo.TIME => value.toString + ".toTime"
+    case _@SqlTimeTypeInfo.TIMESTAMP => value.toString + ".toTimestamp"
+    case _@TimeIntervalTypeInfo.INTERVAL_MILLIS => value.toString + ".millis"
+    case _@TimeIntervalTypeInfo.INTERVAL_MONTHS => value.toString + ".months"
+    case _ => s"Literal($value, $resultType)"
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    resultType match {
+      case BasicTypeInfo.BIG_DEC_TYPE_INFO =>
+        val bigDecValue = value.asInstanceOf[java.math.BigDecimal]
+        val decType = relBuilder.getTypeFactory.createSqlType(SqlTypeName.DECIMAL)
+        relBuilder.getRexBuilder.makeExactLiteral(bigDecValue, decType)
+
+      // create BIGINT literals for long type
+      case BasicTypeInfo.LONG_TYPE_INFO =>
+        val bigint = java.math.BigDecimal.valueOf(value.asInstanceOf[Long])
+        relBuilder.getRexBuilder.makeBigintLiteral(bigint)
+
+      // date/time
+      case SqlTimeTypeInfo.DATE =>
+        val datestr = DateString.fromCalendarFields(valueAsCalendar)
+        relBuilder.getRexBuilder.makeDateLiteral(datestr)
+      case SqlTimeTypeInfo.TIME =>
+        val timestr = TimeString.fromCalendarFields(valueAsCalendar)
+        relBuilder.getRexBuilder.makeTimeLiteral(timestr, 0)
+      case SqlTimeTypeInfo.TIMESTAMP =>
+        val timestampstr = TimestampString.fromCalendarFields(valueAsCalendar)
+        relBuilder.getRexBuilder.makeTimestampLiteral(timestampstr, 3)
+
+      case TimeIntervalTypeInfo.INTERVAL_MONTHS =>
+        val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Int])
+        val intervalQualifier = new SqlIntervalQualifier(
+          TimeUnit.YEAR,
+          TimeUnit.MONTH,
+          SqlParserPos.ZERO)
+        relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
+
+      case TimeIntervalTypeInfo.INTERVAL_MILLIS =>
+        val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Long])
+        val intervalQualifier = new SqlIntervalQualifier(
+          TimeUnit.DAY,
+          TimeUnit.SECOND,
+          SqlParserPos.ZERO)
+        relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
+
+      case _ => relBuilder.literal(value)
+    }
+  }
+
+  /**
+    * Convert a Date value to a Calendar. Calcite's fromCalendarField functions use the
+    * Calendar.get methods, so the raw values of the individual fields are preserved when
+    * converted to the String formats.
+    *
+    * @return get the Calendar value
+    */
+  private def valueAsCalendar: Calendar = {
+    val date = value.asInstanceOf[java.util.Date]
+    val cal = Calendar.getInstance
+    cal.setTime(date)
+    cal
+  }
+}
+
+@deprecated(
+  "Use nullOf(TypeInformation) instead. It is available through the implicit Scala DSL.",
+  "1.8.0")
+case class Null(resultType: TypeInformation[_]) extends LeafExpression {
+  override def toString = s"null"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val rexBuilder = relBuilder.getRexBuilder
+    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    rexBuilder
+      .makeCast(
+        typeFactory.createFieldTypeFromLogicalType(
+          TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType(resultType)),
+        rexBuilder.constantNull())
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/logic.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/logic.scala
new file mode 100644
index 0000000..838261c
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/logic.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.validate._
+
+abstract class BinaryPredicate extends BinaryExpression {
+  override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
+        right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"$this only accepts children of Boolean type, " +
+        s"get $left : ${left.resultType} and $right : ${right.resultType}")
+    }
+  }
+}
+
+case class Not(child: PlannerExpression) extends UnaryExpression {
+
+  override def toString = s"!($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.not(child.toRexNode)
+  }
+
+  override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Not operator requires a boolean expression as input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+}
+
+case class And(left: PlannerExpression, right: PlannerExpression) extends BinaryPredicate {
+
+  override def toString = s"$left && $right"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.and(left.toRexNode, right.toRexNode)
+  }
+}
+
+case class Or(left: PlannerExpression, right: PlannerExpression) extends BinaryPredicate {
+
+  override def toString = s"$left || $right"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.or(left.toRexNode, right.toRexNode)
+  }
+}
+
+@deprecated(
+  "Use ifThenElse(...) instead. It is available through the implicit Scala DSL.",
+  "1.8.0")
+case class If(
+    condition: PlannerExpression,
+    ifTrue: PlannerExpression,
+    ifFalse: PlannerExpression)
+  extends PlannerExpression {
+  private[flink] def children = Seq(condition, ifTrue, ifFalse)
+
+  override private[flink] def resultType = ifTrue.resultType
+
+  override def toString = s"($condition)? $ifTrue : $ifFalse"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val c = condition.toRexNode
+    val t = ifTrue.toRexNode
+    val f = ifFalse.toRexNode
+    relBuilder.call(FlinkSqlOperatorTable.CASE, c, t, f)
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (condition.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
+        ifTrue.resultType == ifFalse.resultType) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(
+        s"If should have boolean condition and same type of ifTrue and ifFalse, get " +
+          s"(${condition.resultType}, ${ifTrue.resultType}, ${ifFalse.resultType})")
+    }
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
new file mode 100644
index 0000000..d52adfb
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+case class Abs(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Abs")
+
+  override def toString: String = s"abs($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.ABS, child.toRexNode)
+  }
+}
+
+case class Ceil(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Ceil")
+
+  override def toString: String = s"ceil($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.CEIL, child.toRexNode)
+  }
+}
+
+case class Exp(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+  override def toString: String = s"exp($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.EXP, child.toRexNode)
+  }
+}
+
+
+case class Floor(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Floor")
+
+  override def toString: String = s"floor($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.FLOOR, child.toRexNode)
+  }
+}
+
+case class Log10(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+  override def toString: String = s"log10($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.LOG10, child.toRexNode)
+  }
+}
+
+case class Log2(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = {
+    relBuilder.call(FlinkSqlOperatorTable.LOG2, child.toRexNode)
+  }
+
+  override def toString: String = s"log2($child)"
+}
+
+case class Cosh(child: PlannerExpression) extends UnaryExpression {
+
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = {
+    relBuilder.call(FlinkSqlOperatorTable.COSH, child.toRexNode)
+  }
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Cosh")
+
+  override def toString = s"cosh($child)"
+}
+
+case class Log(base: PlannerExpression, antilogarithm: PlannerExpression)
+  extends PlannerExpression with InputTypeSpec {
+  def this(antilogarithm: PlannerExpression) = this(E(), antilogarithm)
+
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def children: Seq[PlannerExpression] =
+    if (base == null) Seq(antilogarithm) else Seq(base, antilogarithm)
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq.fill(children.length)(DOUBLE_TYPE_INFO)
+
+  override def toString: String = s"log(${children.mkString(",")})"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.LOG, children.map(_.toRexNode))
+  }
+}
+
+object Log {
+  def apply(antilogarithm: PlannerExpression): Log = Log(null, antilogarithm)
+}
+
+case class Ln(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+  override def toString: String = s"ln($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.LN, child.toRexNode)
+  }
+}
+
+case class Power(left: PlannerExpression, right: PlannerExpression)
+  extends BinaryExpression with InputTypeSpec {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil
+
+  override def toString: String = s"pow($left, $right)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.POWER, left.toRexNode, right.toRexNode)
+  }
+}
+
+case class Sinh(child: PlannerExpression) extends UnaryExpression {
+
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO;
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Sinh")
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = {
+    relBuilder.call(FlinkSqlOperatorTable.SINH, child.toRexNode)
+  }
+
+  override def toString = s"sinh($child)"
+}
+
+case class Sqrt(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(DOUBLE_TYPE_INFO)
+
+  override def toString: String = s"sqrt($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.POWER, child.toRexNode, Literal(0.5).toRexNode)
+  }
+}
+
+case class Sin(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Sin")
+
+  override def toString: String = s"sin($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.SIN, child.toRexNode)
+  }
+}
+
+case class Cos(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Cos")
+
+  override def toString: String = s"cos($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.COS, child.toRexNode)
+  }
+}
+
+case class Tan(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Tan")
+
+  override def toString: String = s"tan($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.TAN, child.toRexNode)
+  }
+}
+
+case class Tanh(child: PlannerExpression) extends UnaryExpression {
+
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = {
+    relBuilder.call(FlinkSqlOperatorTable.TANH, child.toRexNode)
+  }
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Tanh")
+
+  override def toString = s"tanh($child)"
+}
+
+case class Cot(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Cot")
+
+  override def toString: String = s"cot($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.COT, child.toRexNode)
+  }
+}
+
+case class Asin(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Asin")
+
+  override def toString: String = s"asin($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.ASIN, child.toRexNode)
+  }
+}
+
+case class Acos(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Acos")
+
+  override def toString: String = s"acos($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.ACOS, child.toRexNode)
+  }
+}
+
+case class Atan(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Atan")
+
+  override def toString: String = s"atan($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.ATAN, child.toRexNode)
+  }
+}
+
+case class Atan2(y: PlannerExpression, x: PlannerExpression) extends BinaryExpression {
+
+  override private[flink] def left = y
+
+  override private[flink] def right = x
+
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def validateInput() = {
+    TypeInfoCheckUtils.assertNumericExpr(y.resultType, "atan2")
+    TypeInfoCheckUtils.assertNumericExpr(x.resultType, "atan2")
+  }
+
+  override def toString: String = s"atan2($left, $right)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.ATAN2, left.toRexNode, right.toRexNode)
+  }
+}
+
+case class Degrees(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Degrees")
+
+  override def toString: String = s"degrees($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.DEGREES, child.toRexNode)
+  }
+}
+
+case class Radians(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Radians")
+
+  override def toString: String = s"radians($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.RADIANS, child.toRexNode)
+  }
+}
+
+case class Sign(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertNumericExpr(child.resultType, "sign")
+
+  override def toString: String = s"sign($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.SIGN, child.toRexNode)
+  }
+}
+
+case class Round(left: PlannerExpression, right: PlannerExpression)
+  extends BinaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = left.resultType
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeInfoCheckUtils.isInteger(right.resultType)) {
+      ValidationFailure(s"round right requires int, get " +
+        s"$right : ${right.resultType}")
+    }
+    TypeInfoCheckUtils.assertNumericExpr(left.resultType, s"round left :$left")
+  }
+
+  override def toString: String = s"round($left, $right)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.ROUND, left.toRexNode, right.toRexNode)
+  }
+}
+
+case class Pi() extends LeafExpression {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override def toString: String = s"pi()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.PI)
+  }
+}
+
+case class E() extends LeafExpression {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override def toString: String = s"e()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.E)
+  }
+}
+
+case class Rand(seed: PlannerExpression) extends PlannerExpression with InputTypeSpec {
+
+  def this() = this(null)
+
+  override private[flink] def children: Seq[PlannerExpression] = if (seed != null) {
+    seed :: Nil
+  } else {
+    Nil
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
+    INT_TYPE_INFO :: Nil
+  } else {
+    Nil
+  }
+
+  override def toString: String = if (seed != null) {
+    s"rand($seed)"
+  } else {
+    s"rand()"
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.RAND, children.map(_.toRexNode))
+  }
+}
+
+case class RandInteger(seed: PlannerExpression, bound: PlannerExpression)
+  extends PlannerExpression with InputTypeSpec {
+
+  def this(bound: PlannerExpression) = this(null, bound)
+
+  override private[flink] def children: Seq[PlannerExpression] = if (seed != null) {
+    seed :: bound :: Nil
+  } else {
+    bound :: Nil
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.INT_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
+    INT_TYPE_INFO :: INT_TYPE_INFO :: Nil
+  } else {
+    INT_TYPE_INFO :: Nil
+  }
+
+  override def toString: String = if (seed != null) {
+    s"randInteger($seed, $bound)"
+  } else {
+    s"randInteger($bound)"
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.RAND_INTEGER, children.map(_.toRexNode))
+  }
+}
+
+case class Bin(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeInfoCheckUtils.assertIntegerFamilyExpr(child.resultType, "Bin")
+
+  override def toString: String = s"bin($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.BIN, child.toRexNode)
+  }
+}
+
+case class Hex(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (TypeInfoCheckUtils.isIntegerFamily(child.resultType) ||
+        TypeInfoCheckUtils.isString(child.resultType)) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"hex() requires an integer or string input but was '${child.resultType}'.")
+    }
+  }
+
+  override def toString: String = s"hex($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.HEX, child.toRexNode)
+  }
+}
+
+case class UUID() extends LeafExpression {
+  override private[flink] def resultType = BasicTypeInfo.STRING_TYPE_INFO
+
+  override def toString: String = s"uuid()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.UUID)
+  }
+}
+
+case class Truncate(base: PlannerExpression, num: PlannerExpression)
+    extends PlannerExpression with InputTypeSpec {
+  def this(base: PlannerExpression) = this(base, null)
+
+  override private[flink] def resultType: TypeInformation[_] = base.resultType
+
+  override private[flink] def children: Seq[PlannerExpression] =
+    if (num == null) Seq(base) else Seq(base, num)
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    if (num == null) Seq(base.resultType) else Seq(base.resultType, INT_TYPE_INFO)
+
+  override def toString: String = s"truncate(${children.mkString(",")})"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.TRUNCATE, children.map(_.toRexNode))
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (num != null) {
+      if (!TypeInfoCheckUtils.isInteger(num.resultType)) {
+        ValidationFailure(s"truncate num requires int, get " +
+          s"$num : ${num.resultType}")
+      }
+    }
+    TypeInfoCheckUtils.assertNumericExpr(base.resultType, s"truncate base :$base")
+  }
+}
+
+object Truncate {
+  def apply(base: PlannerExpression): Truncate = Truncate(base, null)
+}
+
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ordering.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ordering.scala
new file mode 100644
index 0000000..2b668d9
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ordering.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.validate._
+
+abstract class Ordering extends UnaryExpression {
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!child.isInstanceOf[NamedExpression]) {
+      ValidationFailure(s"Sort should only based on field reference")
+    } else {
+      ValidationSuccess
+    }
+  }
+}
+
+case class Asc(child: PlannerExpression) extends Ordering {
+  override def toString: String = s"($child).asc"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    child.toRexNode
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = child.resultType
+}
+
+case class Desc(child: PlannerExpression) extends Ordering {
+  override def toString: String = s"($child).desc"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.desc(child.toRexNode)
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = child.resultType
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala
new file mode 100644
index 0000000..ce1966f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+case class CurrentRow() extends PlannerExpression {
+  override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
+
+  override private[flink] def children = Seq()
+
+  override def toString = "CURRENT ROW"
+}
+
+case class CurrentRange() extends PlannerExpression {
+  override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+  override private[flink] def children = Seq()
+
+  override def toString = "CURRENT RANGE"
+}
+
+case class UnboundedRow() extends PlannerExpression {
+  override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
+
+  override private[flink] def children = Seq()
+
+  override def toString = "UNBOUNDED ROW"
+}
+
+case class UnboundedRange() extends PlannerExpression {
+  override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+  override private[flink] def children = Seq()
+
+  override def toString = "UNBOUNDED RANGE"
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/package.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/package.scala
new file mode 100644
index 0000000..41e0c9f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/package.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table
+
+/**
+ * This package contains the base class of AST nodes and all the expression language AST classes.
+ * Expression trees should not be manually constructed by users. They are implicitly constructed
+ * from the implicit DSL conversions in
+ * [[org.apache.flink.table.api.scala.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.table.api.scala.ImplicitExpressionOperations]]. For the Java API,
+ * expression trees should be generated from a string parser that parses expressions and creates
+ * AST nodes.
+ */
+package object expressions
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
new file mode 100644
index 0000000..9ef0252
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.expressions.PlannerTrimMode.PlannerTrimMode
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+/**
+  * Returns the length of this `str`.
+  */
+case class CharLength(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"CharLength operator requires String input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override def toString: String = s"($child).charLength()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.CHAR_LENGTH, child.toRexNode)
+  }
+}
+
+/**
+  * Returns str with the first letter of each word in uppercase.
+  * All other letters are in lowercase. Words are delimited by white space.
+  */
+case class InitCap(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"InitCap operator requires String input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override def toString: String = s"($child).initCap()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.INITCAP, child.toRexNode)
+  }
+}
+
+/**
+  * Returns true if `str` matches `pattern`.
+  */
+case class Like(str: PlannerExpression, pattern: PlannerExpression) extends BinaryExpression {
+  private[flink] def left: PlannerExpression = str
+  private[flink] def right: PlannerExpression = pattern
+
+  override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Like operator requires (String, String) input, " +
+        s"but ($str, $pattern) is of type (${str.resultType}, ${pattern.resultType})")
+    }
+  }
+
+  override def toString: String = s"($str).like($pattern)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.LIKE, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Returns str with all characters changed to lowercase.
+  */
+case class Lower(child: PlannerExpression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Lower operator requires String input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override def toString: String = s"($child).lowerCase()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.LOWER, child.toRexNode)
+  }
+}
+
+/**
+  * Returns true if `str` is similar to `pattern`.
+  */
+case class Similar(str: PlannerExpression, pattern: PlannerExpression) extends BinaryExpression {
+  private[flink] def left: PlannerExpression = str
+  private[flink] def right: PlannerExpression = pattern
+
+  override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Similar operator requires (String, String) input, " +
+        s"but ($str, $pattern) is of type (${str.resultType}, ${pattern.resultType})")
+    }
+  }
+
+  override def toString: String = s"($str).similarTo($pattern)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.SIMILAR_TO, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Returns substring of `str` from `begin`(inclusive) for `length`.
+  */
+case class Substring(
+    str: PlannerExpression,
+    begin: PlannerExpression,
+    length: PlannerExpression) extends PlannerExpression with InputTypeSpec {
+
+  def this(str: PlannerExpression, begin: PlannerExpression) = this(str, begin, CharLength(str))
+
+  override private[flink] def children: Seq[PlannerExpression] = str :: begin :: length :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+  override def toString: String = s"($str).substring($begin, $length)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.SUBSTRING, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Trim `trimString` from `str` according to `trimMode`.
+  */
+case class Trim(
+    trimMode: PlannerExpression,
+    trimString: PlannerExpression,
+    str: PlannerExpression) extends PlannerExpression {
+
+  override private[flink] def children: Seq[PlannerExpression] =
+    trimMode :: trimString :: str :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    trimMode match {
+      case SymbolPlannerExpression(_: PlannerTrimMode) =>
+        if (trimString.resultType != STRING_TYPE_INFO) {
+          ValidationFailure(s"String expected for trimString, get ${trimString.resultType}")
+        } else if (str.resultType != STRING_TYPE_INFO) {
+          ValidationFailure(s"String expected for str, get ${str.resultType}")
+        } else {
+          ValidationSuccess
+        }
+      case _ => ValidationFailure("TrimMode symbol expected.")
+    }
+  }
+
+  override def toString: String = s"($str).trim($trimMode, $trimString)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.TRIM, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Enumeration of trim flags.
+  */
+object TrimConstants {
+  val TRIM_DEFAULT_CHAR = Literal(" ")
+}
+
+/**
+  * Returns str with all characters changed to uppercase.
+  */
+case class Upper(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO)
+
+  override def toString: String = s"($child).upperCase()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.UPPER, child.toRexNode)
+  }
+}
+
+/**
+  * Returns the position of string needle in string haystack.
+  */
+case class Position(needle: PlannerExpression, haystack: PlannerExpression)
+    extends PlannerExpression with InputTypeSpec {
+
+  override private[flink] def children: Seq[PlannerExpression] = Seq(needle, haystack)
+
+  override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO)
+
+  override def toString: String = s"($needle).position($haystack)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.POSITION, needle.toRexNode, haystack.toRexNode)
+  }
+}
+
+/**
+  * Replaces a substring of a string with a replacement string.
+  * Starting at a position for a given length.
+  */
+case class Overlay(
+    str: PlannerExpression,
+    replacement: PlannerExpression,
+    starting: PlannerExpression,
+    position: PlannerExpression)
+  extends PlannerExpression with InputTypeSpec {
+
+  def this(str: PlannerExpression, replacement: PlannerExpression, starting: PlannerExpression) =
+    this(str, replacement, starting, CharLength(replacement))
+
+  override private[flink] def children: Seq[PlannerExpression] =
+    Seq(str, replacement, starting, position)
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+  override def toString: String = s"($str).overlay($replacement, $starting, $position)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(
+      FlinkSqlOperatorTable.OVERLAY,
+      str.toRexNode,
+      replacement.toRexNode,
+      starting.toRexNode,
+      position.toRexNode)
+  }
+}
+
+/**
+  * Returns the string that results from concatenating the arguments.
+  * Returns NULL if any argument is NULL.
+  */
+case class Concat(strings: Seq[PlannerExpression]) extends PlannerExpression with InputTypeSpec {
+
+  override private[flink] def children: Seq[PlannerExpression] = strings
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    children.map(_ => STRING_TYPE_INFO)
+
+  override def toString: String = s"concat($strings)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.CONCAT, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Returns the string that results from concatenating the arguments and separator.
+  * Returns NULL If the separator is NULL.
+  *
+  * Note: this user-defined function does not skip empty strings. However, it does skip any NULL
+  * values after the separator argument.
+  **/
+case class ConcatWs(separator: PlannerExpression, strings: Seq[PlannerExpression])
+  extends PlannerExpression with InputTypeSpec {
+
+  override private[flink] def children: Seq[PlannerExpression] = Seq(separator) ++ strings
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    children.map(_ => STRING_TYPE_INFO)
+
+  override def toString: String = s"concat_ws($separator, $strings)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.CONCAT_WS, children.map(_.toRexNode))
+  }
+}
+
+case class Lpad(text: PlannerExpression, len: PlannerExpression, pad: PlannerExpression)
+  extends PlannerExpression with InputTypeSpec {
+
+  override private[flink] def children: Seq[PlannerExpression] = Seq(text, len, pad)
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+
+  override def toString: String = s"($text).lpad($len, $pad)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.LPAD, children.map(_.toRexNode))
+  }
+}
+
+case class Rpad(text: PlannerExpression, len: PlannerExpression, pad: PlannerExpression)
+  extends PlannerExpression with InputTypeSpec {
+
+  override private[flink] def children: Seq[PlannerExpression] = Seq(text, len, pad)
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+
+  override def toString: String = s"($text).rpad($len, $pad)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.RPAD, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Returns a string with all substrings that match the regular expression consecutively
+  * being replaced.
+  */
+case class RegexpReplace(
+    str: PlannerExpression,
+    regex: PlannerExpression,
+    replacement: PlannerExpression)
+  extends PlannerExpression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO)
+
+  override private[flink] def children: Seq[PlannerExpression] = Seq(str, regex, replacement)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.REGEXP_REPLACE, children.map(_.toRexNode))
+  }
+
+  override def toString: String = s"($str).regexp_replace($regex, $replacement)"
+}
+
+/**
+  * Returns a string extracted with a specified regular expression and a regex match group index.
+  */
+case class RegexpExtract(
+    str: PlannerExpression,
+    regex: PlannerExpression,
+    extractIndex: PlannerExpression)
+  extends PlannerExpression with InputTypeSpec {
+  def this(str: PlannerExpression, regex: PlannerExpression) = this(str, regex, null)
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = {
+    if (extractIndex == null) {
+      Seq(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+    } else {
+      Seq(
+        BasicTypeInfo.STRING_TYPE_INFO,
+        BasicTypeInfo.STRING_TYPE_INFO,
+        BasicTypeInfo.INT_TYPE_INFO)
+    }
+  }
+
+  override private[flink] def children: Seq[PlannerExpression] = {
+    if (extractIndex == null) {
+      Seq(str, regex)
+    } else {
+      Seq(str, regex, extractIndex)
+    }
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.REGEXP_EXTRACT, children.map(_.toRexNode))
+  }
+
+  override def toString: String = s"($str).regexp_extract($regex, $extractIndex)"
+}
+
+object RegexpExtract {
+  def apply(str: PlannerExpression, regex: PlannerExpression): RegexpExtract =
+    RegexpExtract(str, regex, null)
+}
+
+/**
+  * Returns the base string decoded with base64.
+  * Returns NULL If the input string is NULL.
+  */
+case class FromBase64(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"FromBase64 operator requires String input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.FROM_BASE64, child.toRexNode)
+  }
+
+  override def toString: String = s"($child).fromBase64"
+
+}
+
+/**
+  * Returns the base64-encoded result of the input string.
+  */
+case class ToBase64(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"ToBase64 operator requires a String input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.TO_BASE64, child.toRexNode)
+  }
+
+  override def toString: String = s"($child).toBase64"
+
+}
+
+/**
+  * Returns a string that removes the left whitespaces from the given string.
+  */
+case class LTrim(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"LTrim operator requires a String input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.LTRIM, child.toRexNode)
+  }
+
+  override def toString = s"($child).ltrim"
+}
+
+/**
+  * Returns a string that removes the right whitespaces from the given string.
+  */
+case class RTrim(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"RTrim operator requires a String input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.RTRIM, child.toRexNode)
+  }
+
+  override def toString = s"($child).rtrim"
+}
+
+/**
+  * Returns a string that repeats the base str n times.
+  */
+case class Repeat(str: PlannerExpression, n: PlannerExpression)
+  extends PlannerExpression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO)
+
+  override private[flink] def children: Seq[PlannerExpression] = Seq(str, n)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.REPEAT, str.toRexNode, n.toRexNode)
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (str.resultType == STRING_TYPE_INFO && n.resultType == INT_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Repeat operator requires (String, Int) input, " +
+        s"but ($str, $n) is of type (${str.resultType}, ${n.resultType})")
+    }
+  }
+
+  override def toString: String = s"($str).repeat($n)"
+}
+
+/**
+  * Returns a new string which replaces all the occurrences of the search target
+  * with the replacement string (non-overlapping).
+  */
+case class Replace(
+    str: PlannerExpression,
+    search: PlannerExpression,
+    replacement: PlannerExpression) extends PlannerExpression with InputTypeSpec {
+
+  def this(str: PlannerExpression, begin: PlannerExpression) = this(str, begin, CharLength(str))
+
+  override private[flink] def children: Seq[PlannerExpression] = str :: search :: replacement :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO)
+
+  override def toString: String = s"($str).replace($search, $replacement)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.REPLACE, children.map(_.toRexNode))
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/subquery.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/subquery.scala
new file mode 100644
index 0000000..694e0ea
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/subquery.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.rex.{RexNode, RexSubQuery}
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.operations.QueryOperation
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils._
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+case class In(expression: PlannerExpression, elements: Seq[PlannerExpression])
+  extends PlannerExpression  {
+
+  override def toString = s"$expression.in(${elements.mkString(", ")})"
+
+  override private[flink] def children: Seq[PlannerExpression] = expression +: elements.distinct
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    // check if this is a sub-query expression or an element list
+    elements.head match {
+
+      case TableReference(_, tableOperation: QueryOperation) =>
+        RexSubQuery.in(
+          relBuilder.asInstanceOf[FlinkRelBuilder].queryOperation(tableOperation).build(),
+          ImmutableList.of(expression.toRexNode))
+
+      case _ =>
+        relBuilder.call(FlinkSqlOperatorTable.IN, children.map(_.toRexNode): _*)
+    }
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    // check if this is a sub-query expression or an element list
+    elements.head match {
+
+      case TableReference(name, tableOperation: QueryOperation) =>
+        if (elements.length != 1) {
+          return ValidationFailure("IN operator supports only one table reference.")
+        }
+        val tableSchema = tableOperation.getTableSchema
+        if (tableSchema.getFieldCount > 1) {
+          return ValidationFailure(
+            s"The sub-query table '$name' must not have more than one column.")
+        }
+        (expression.resultType, tableSchema.getFieldType(0).get()) match {
+          case (lType, rType) if lType == rType => ValidationSuccess
+          case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+          case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
+            ValidationSuccess
+          case (lType, rType) =>
+            ValidationFailure(s"IN operator on incompatible types: $lType and $rType.")
+        }
+
+      case _ =>
+        val types = children.tail.map(_.resultType)
+        if (types.distinct.length != 1) {
+          return ValidationFailure(
+            s"Types on the right side of the IN operator must be the same, " +
+              s"got ${types.mkString(", ")}.")
+        }
+        (children.head.resultType, children.last.resultType) match {
+          case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+          case (lType, rType) if lType == rType => ValidationSuccess
+          case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
+            ValidationSuccess
+          case (lType, rType) =>
+            ValidationFailure(s"IN operator on incompatible types: $lType and $rType.")
+        }
+    }
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+}
+
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/symbols.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/symbols.scala
new file mode 100644
index 0000000..48135a3
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/symbols.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlTrimFunction
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.language.{existentials, implicitConversions}
+
+/**
+  * General expression class to represent a symbol.
+  */
+case class SymbolPlannerExpression(symbol: PlannerSymbol) extends LeafExpression {
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw new UnsupportedOperationException("This should not happen. A symbol has no result type.")
+
+  def toExpr: SymbolPlannerExpression = this // triggers implicit conversion
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    // dirty hack to pass Java enums to Java from Scala
+    val enum = symbol.enum.asInstanceOf[Enum[T] forSome { type T <: Enum[T] }]
+    relBuilder.getRexBuilder.makeFlag(enum)
+  }
+
+  override def toString: String = s"${symbol.symbols}.${symbol.name}"
+
+}
+
+/**
+  * Symbol that wraps a Calcite symbol in form of a Java enum.
+  */
+trait PlannerSymbol {
+  def symbols: PlannerSymbols
+  def name: String
+  def enum: Enum[_]
+}
+
+/**
+  * Enumeration of symbols.
+  */
+abstract class PlannerSymbols extends Enumeration {
+
+  class PlannerSymbolValue(e: Enum[_]) extends Val(e.name()) with PlannerSymbol {
+    override def symbols: PlannerSymbols = PlannerSymbols.this
+
+    override def enum: Enum[_] = e
+
+    override def name: String = toString()
+  }
+
+  protected final def Value(enum: Enum[_]): PlannerSymbolValue = new PlannerSymbolValue(enum)
+
+  implicit def symbolToExpression(symbol: PlannerSymbolValue): SymbolPlannerExpression =
+    SymbolPlannerExpression(symbol)
+
+}
+
+/**
+  * Units for working with time intervals.
+  */
+object PlannerTimeIntervalUnit extends PlannerSymbols {
+
+  type PlannerTimeIntervalUnit = PlannerSymbolValue
+
+  val YEAR = Value(TimeUnitRange.YEAR)
+  val YEAR_TO_MONTH = Value(TimeUnitRange.YEAR_TO_MONTH)
+  val QUARTER = Value(TimeUnitRange.QUARTER)
+  val MONTH = Value(TimeUnitRange.MONTH)
+  val WEEK = Value(TimeUnitRange.WEEK)
+  val DAY = Value(TimeUnitRange.DAY)
+  val DAY_TO_HOUR = Value(TimeUnitRange.DAY_TO_HOUR)
+  val DAY_TO_MINUTE = Value(TimeUnitRange.DAY_TO_MINUTE)
+  val DAY_TO_SECOND = Value(TimeUnitRange.DAY_TO_SECOND)
+  val HOUR = Value(TimeUnitRange.HOUR)
+  val HOUR_TO_MINUTE = Value(TimeUnitRange.HOUR_TO_MINUTE)
+  val HOUR_TO_SECOND = Value(TimeUnitRange.HOUR_TO_SECOND)
+  val MINUTE = Value(TimeUnitRange.MINUTE)
+  val MINUTE_TO_SECOND = Value(TimeUnitRange.MINUTE_TO_SECOND)
+  val SECOND = Value(TimeUnitRange.SECOND)
+
+}
+
+/**
+  * Units for working with time points.
+  */
+object PlannerTimePointUnit extends PlannerSymbols {
+
+  type PlannerTimePointUnit = PlannerSymbolValue
+
+  val YEAR = Value(TimeUnit.YEAR)
+  val MONTH = Value(TimeUnit.MONTH)
+  val DAY = Value(TimeUnit.DAY)
+  val HOUR = Value(TimeUnit.HOUR)
+  val MINUTE = Value(TimeUnit.MINUTE)
+  val SECOND = Value(TimeUnit.SECOND)
+  val QUARTER = Value(TimeUnit.QUARTER)
+  val WEEK = Value(TimeUnit.WEEK)
+  val MILLISECOND = Value(TimeUnit.MILLISECOND)
+  val MICROSECOND = Value(TimeUnit.MICROSECOND)
+
+}
+
+/**
+  * Modes for trimming strings.
+  */
+object PlannerTrimMode extends PlannerSymbols {
+
+  type PlannerTrimMode = PlannerSymbolValue
+
+  val BOTH = Value(SqlTrimFunction.Flag.BOTH)
+  val LEADING = Value(SqlTrimFunction.Flag.LEADING)
+  val TRAILING = Value(SqlTrimFunction.Flag.TRAILING)
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/time.scala
new file mode 100644
index 0000000..dee618b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex._
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.expressions.PlannerTimeIntervalUnit.PlannerTimeIntervalUnit
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeInfoCheckUtils}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConversions._
+
+case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpression)
+  extends PlannerExpression {
+
+  override private[flink] def children: Seq[PlannerExpression] = timeIntervalUnit :: temporal :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeInfoCheckUtils.isTemporal(temporal.resultType)) {
+      return ValidationFailure(s"Extract operator requires Temporal input, " +
+        s"but $temporal is of type ${temporal.resultType}")
+    }
+
+    timeIntervalUnit match {
+      case SymbolPlannerExpression(PlannerTimeIntervalUnit.YEAR)
+           | SymbolPlannerExpression(PlannerTimeIntervalUnit.QUARTER)
+           | SymbolPlannerExpression(PlannerTimeIntervalUnit.MONTH)
+           | SymbolPlannerExpression(PlannerTimeIntervalUnit.WEEK)
+           | SymbolPlannerExpression(PlannerTimeIntervalUnit.DAY)
+        if temporal.resultType == SqlTimeTypeInfo.DATE
+          || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
+          || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS
+          || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MONTHS =>
+        ValidationSuccess
+
+      case SymbolPlannerExpression(PlannerTimeIntervalUnit.HOUR)
+           | SymbolPlannerExpression(PlannerTimeIntervalUnit.MINUTE)
+           | SymbolPlannerExpression(PlannerTimeIntervalUnit.SECOND)
+        if temporal.resultType == SqlTimeTypeInfo.TIME
+          || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
+          || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS =>
+        ValidationSuccess
+
+      case _ =>
+        ValidationFailure(s"Extract operator does not support unit '$timeIntervalUnit' for input" +
+          s" of type '${temporal.resultType}'.")
+    }
+  }
+
+  override def toString: String = s"($temporal).extract($timeIntervalUnit)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(
+        FlinkSqlOperatorTable.EXTRACT,
+        Seq(timeIntervalUnit.toRexNode, temporal.toRexNode))
+  }
+}
+
+abstract class TemporalCeilFloor(
+    timeIntervalUnit: PlannerExpression,
+    temporal: PlannerExpression)
+  extends PlannerExpression {
+
+  override private[flink] def children: Seq[PlannerExpression] = timeIntervalUnit :: temporal :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = temporal.resultType
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeInfoCheckUtils.isTimePoint(temporal.resultType)) {
+      return ValidationFailure(s"Temporal ceil/floor operator requires Time Point input, " +
+        s"but $temporal is of type ${temporal.resultType}")
+    }
+    val unit = timeIntervalUnit match {
+      case SymbolPlannerExpression(u: PlannerTimeIntervalUnit) => Some(u)
+      case _ => None
+    }
+    if (unit.isEmpty) {
+      return ValidationFailure(s"Temporal ceil/floor operator requires Time Interval Unit " +
+        s"input, but $timeIntervalUnit is of type ${timeIntervalUnit.resultType}")
+    }
+
+    (unit.get, temporal.resultType) match {
+      case (PlannerTimeIntervalUnit.YEAR | PlannerTimeIntervalUnit.MONTH,
+          SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP) =>
+        ValidationSuccess
+      case (PlannerTimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP) =>
+        ValidationSuccess
+      case (PlannerTimeIntervalUnit.HOUR | PlannerTimeIntervalUnit.MINUTE |
+          PlannerTimeIntervalUnit.SECOND, SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP) =>
+        ValidationSuccess
+      case _ =>
+        ValidationFailure(s"Temporal ceil/floor operator does not support " +
+          s"unit '$timeIntervalUnit' for input of type '${temporal.resultType}'.")
+    }
+  }
+}
+
+case class TemporalFloor(
+    timeIntervalUnit: PlannerExpression,
+    temporal: PlannerExpression)
+  extends TemporalCeilFloor(
+    timeIntervalUnit,
+    temporal) {
+
+  override def toString: String = s"($temporal).floor($timeIntervalUnit)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.FLOOR, temporal.toRexNode, timeIntervalUnit.toRexNode)
+  }
+}
+
+case class TemporalCeil(
+    timeIntervalUnit: PlannerExpression,
+    temporal: PlannerExpression)
+  extends TemporalCeilFloor(
+    timeIntervalUnit,
+    temporal) {
+
+  override def toString: String = s"($temporal).ceil($timeIntervalUnit)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(FlinkSqlOperatorTable.CEIL, temporal.toRexNode, timeIntervalUnit.toRexNode)
+  }
+}
+
+abstract class CurrentTimePoint(
+    targetType: TypeInformation[_],
+    local: Boolean)
+  extends LeafExpression {
+
+  override private[flink] def resultType: TypeInformation[_] = targetType
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeInfoCheckUtils.isTimePoint(targetType)) {
+      ValidationFailure(s"CurrentTimePoint operator requires Time Point target type, " +
+        s"but get $targetType.")
+    } else if (local && targetType == SqlTimeTypeInfo.DATE) {
+      ValidationFailure(s"Localized CurrentTimePoint operator requires Time or Timestamp target " +
+        s"type, but get $targetType.")
+    } else {
+      ValidationSuccess
+    }
+  }
+
+  override def toString: String = if (local) {
+    s"local$targetType()"
+  } else {
+    s"current$targetType()"
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val operator = targetType match {
+      case SqlTimeTypeInfo.TIME if local => FlinkSqlOperatorTable.LOCALTIME
+      case SqlTimeTypeInfo.TIMESTAMP if local => FlinkSqlOperatorTable.LOCALTIMESTAMP
+      case SqlTimeTypeInfo.DATE => FlinkSqlOperatorTable.CURRENT_DATE
+      case SqlTimeTypeInfo.TIME => FlinkSqlOperatorTable.CURRENT_TIME
+      case SqlTimeTypeInfo.TIMESTAMP => FlinkSqlOperatorTable.CURRENT_TIMESTAMP
+    }
+    relBuilder.call(operator)
+  }
+}
+
+case class CurrentDate() extends CurrentTimePoint(SqlTimeTypeInfo.DATE, local = false)
+
+case class CurrentTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = false)
+
+case class CurrentTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = false)
+
+case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = true)
+
+case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true)
+
+/**
+  * Determines whether two anchored time intervals overlap.
+  */
+case class TemporalOverlaps(
+    leftTimePoint: PlannerExpression,
+    leftTemporal: PlannerExpression,
+    rightTimePoint: PlannerExpression,
+    rightTemporal: PlannerExpression)
+  extends PlannerExpression {
+
+  override private[flink] def children: Seq[PlannerExpression] =
+    Seq(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
+
+  override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeInfoCheckUtils.isTimePoint(leftTimePoint.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint to be of type " +
+        s"Time Point, but get ${leftTimePoint.resultType}.")
+    }
+    if (!TypeInfoCheckUtils.isTimePoint(rightTimePoint.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires rightTimePoint to be of " +
+        s"type Time Point, but get ${rightTimePoint.resultType}.")
+    }
+    if (leftTimePoint.resultType != rightTimePoint.resultType) {
+      return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint and " +
+        s"rightTimePoint to be of same type.")
+    }
+
+    // leftTemporal is point, then it must be comparable with leftTimePoint
+    if (TypeInfoCheckUtils.isTimePoint(leftTemporal.resultType)) {
+      if (leftTemporal.resultType != leftTimePoint.resultType) {
+        return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal and " +
+          s"leftTimePoint to be of same type if leftTemporal is of type Time Point.")
+      }
+    } else if (!isTimeInterval(leftTemporal.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal to be of " +
+        s"type Time Point or Time Interval.")
+    }
+
+    // rightTemporal is point, then it must be comparable with rightTimePoint
+    if (TypeInfoCheckUtils.isTimePoint(rightTemporal.resultType)) {
+      if (rightTemporal.resultType != rightTimePoint.resultType) {
+        return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal and " +
+          s"rightTimePoint to be of same type if rightTemporal is of type Time Point.")
+      }
+    } else if (!isTimeInterval(rightTemporal.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal to be of " +
+        s"type Time Point or Time Interval.")
+    }
+    ValidationSuccess
+  }
+
+  override def toString: String = s"temporalOverlaps(${children.mkString(", ")})"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    convertOverlaps(
+      leftTimePoint.toRexNode,
+      leftTemporal.toRexNode,
+      rightTimePoint.toRexNode,
+      rightTemporal.toRexNode,
+      relBuilder.asInstanceOf[FlinkRelBuilder])
+  }
+
+  /**
+    * Standard conversion of the OVERLAPS operator.
+    * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]]
+    */
+  private def convertOverlaps(
+      leftP: RexNode,
+      leftT: RexNode,
+      rightP: RexNode,
+      rightT: RexNode,
+      relBuilder: FlinkRelBuilder)
+    : RexNode = {
+    val convLeftT = convertOverlapsEnd(relBuilder, leftP, leftT, leftTemporal.resultType)
+    val convRightT = convertOverlapsEnd(relBuilder, rightP, rightT, rightTemporal.resultType)
+
+    // sort end points into start and end, such that (s0 <= e0) and (s1 <= e1).
+    val (s0, e0) = buildSwap(relBuilder, leftP, convLeftT)
+    val (s1, e1) = buildSwap(relBuilder, rightP, convRightT)
+
+    // (e0 >= s1) AND (e1 >= s0)
+    val leftPred = relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e0, s1)
+    val rightPred = relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e1, s0)
+    relBuilder.call(FlinkSqlOperatorTable.AND, leftPred, rightPred)
+  }
+
+  private def convertOverlapsEnd(
+      relBuilder: FlinkRelBuilder,
+      start: RexNode, end: RexNode,
+      endType: TypeInformation[_]) = {
+    if (isTimeInterval(endType)) {
+      relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, start, end)
+    } else {
+      end
+    }
+  }
+
+  private def buildSwap(relBuilder: FlinkRelBuilder, start: RexNode, end: RexNode) = {
+    val le = relBuilder.call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, start, end)
+    val l = relBuilder.call(FlinkSqlOperatorTable.CASE, le, start, end)
+    val r = relBuilder.call(FlinkSqlOperatorTable.CASE, le, end, start)
+    (l, r)
+  }
+}
+
+case class DateFormat(timestamp: PlannerExpression, format: PlannerExpression)
+  extends PlannerExpression {
+  override private[flink] def children = timestamp :: format :: Nil
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder) =
+    relBuilder.call(FlinkSqlOperatorTable.DATE_FORMAT, timestamp.toRexNode, format.toRexNode)
+
+  override def toString: String = s"$timestamp.dateFormat($format)"
+
+  override private[flink] def resultType = STRING_TYPE_INFO
+}
+
+case class TimestampDiff(
+    timePointUnit: PlannerExpression,
+    timePoint1: PlannerExpression,
+    timePoint2: PlannerExpression)
+  extends PlannerExpression {
+
+  override private[flink] def children: Seq[PlannerExpression] =
+    timePointUnit :: timePoint1 :: timePoint2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeInfoCheckUtils.isTimePoint(timePoint1.resultType)) {
+      return ValidationFailure(
+        s"$this requires an input time point type, " +
+        s"but timePoint1 is of type '${timePoint1.resultType}'.")
+    }
+
+    if (!TypeInfoCheckUtils.isTimePoint(timePoint2.resultType)) {
+      return ValidationFailure(
+        s"$this requires an input time point type, " +
+        s"but timePoint2 is of type '${timePoint2.resultType}'.")
+    }
+
+    timePointUnit match {
+      case SymbolPlannerExpression(PlannerTimePointUnit.YEAR)
+           | SymbolPlannerExpression(PlannerTimePointUnit.QUARTER)
+           | SymbolPlannerExpression(PlannerTimePointUnit.MONTH)
+           | SymbolPlannerExpression(PlannerTimePointUnit.WEEK)
+           | SymbolPlannerExpression(PlannerTimePointUnit.DAY)
+           | SymbolPlannerExpression(PlannerTimePointUnit.HOUR)
+           | SymbolPlannerExpression(PlannerTimePointUnit.MINUTE)
+           | SymbolPlannerExpression(PlannerTimePointUnit.SECOND)
+        if timePoint1.resultType == SqlTimeTypeInfo.DATE
+          || timePoint1.resultType == SqlTimeTypeInfo.TIMESTAMP
+          || timePoint2.resultType == SqlTimeTypeInfo.DATE
+          || timePoint2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+        ValidationSuccess
+
+      case _ =>
+        ValidationFailure(s"$this operator does not support unit '$timePointUnit'" +
+            s" for input of type ('${timePoint1.resultType}', '${timePoint2.resultType}').")
+    }
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+    .getRexBuilder
+    .makeCall(FlinkSqlOperatorTable.TIMESTAMP_DIFF,
+       Seq(timePointUnit.toRexNode, timePoint2.toRexNode, timePoint1.toRexNode))
+  }
+
+  override def toString: String = s"timestampDiff(${children.mkString(", ")})"
+
+  override private[flink] def resultType = INT_TYPE_INFO
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
new file mode 100644
index 0000000..72b7c25
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess}
+
+trait WindowProperty {
+
+  def toNamedWindowProperty(name: String): NamedWindowProperty
+
+  def resultType: TypeInformation[_]
+
+}
+
+abstract class AbstractWindowProperty(child: PlannerExpression)
+  extends UnaryExpression
+  with WindowProperty {
+
+  override def toString = s"WindowProperty($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException("WindowProperty cannot be transformed to RexNode.")
+
+  override private[flink] def validateInput() =
+    if (child.isInstanceOf[WindowReference]) {
+      ValidationSuccess
+    } else {
+      ValidationFailure("Child must be a window reference.")
+    }
+
+  def toNamedWindowProperty(name: String): NamedWindowProperty = NamedWindowProperty(name, this)
+}
+
+case class WindowStart(child: PlannerExpression) extends AbstractWindowProperty(child) {
+
+  override def resultType = SqlTimeTypeInfo.TIMESTAMP
+
+  override def toString: String = s"start($child)"
+}
+
+case class WindowEnd(child: PlannerExpression) extends AbstractWindowProperty(child) {
+
+  override def resultType = SqlTimeTypeInfo.TIMESTAMP
+
+  override def toString: String = s"end($child)"
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 082d17a..df19f5a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -128,6 +128,14 @@ object UserDefinedFunctionUtils {
     getParamClassesConsiderVarArgs(method.isVarArgs, method.getParameterTypes, expectedTypes.length)
   }
 
+  def getEvalMethodSignatureOption(
+      func: ScalarFunction,
+      expectedTypes: Array[LogicalType]): Option[Array[Class[_]]] = {
+    getEvalUserDefinedMethod(func, expectedTypes).map( method =>
+      getParamClassesConsiderVarArgs(
+        method.isVarArgs, method.getParameterTypes, expectedTypes.length))
+  }
+
   def getEvalMethodSignature(
       func: TableFunction[_],
       expectedTypes: Array[LogicalType]): Array[Class[_]] = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/TreeNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
new file mode 100644
index 0000000..0535aa4
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan
+
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils
+
+/**
+ * Generic base class for trees that can be transformed and traversed.
+ */
+abstract class TreeNode[A <: TreeNode[A]] extends Product { self: A =>
+
+  /**
+   * List of child nodes that should be considered when doing transformations. Other values
+   * in the Product will not be transformed, only handed through.
+   */
+  private[flink] def children: Seq[A]
+
+  /**
+   * Tests for equality by first testing for reference equality.
+   */
+  private[flink] def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
+
+  /**
+    * Do tree transformation in post order.
+    */
+  private[flink] def postOrderTransform(rule: PartialFunction[A, A]): A = {
+    def childrenTransform(rule: PartialFunction[A, A]): A = {
+      var changed = false
+      val newArgs = productIterator.map {
+        case arg: TreeNode[_] if children.contains(arg) =>
+          val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
+          if (!(newChild fastEquals arg)) {
+            changed = true
+            newChild
+          } else {
+            arg
+          }
+        case args: Traversable[_] => args.map {
+          case arg: TreeNode[_] if children.contains(arg) =>
+            val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
+            if (!(newChild fastEquals arg)) {
+              changed = true
+              newChild
+            } else {
+              arg
+            }
+          case other => other
+        }
+        case nonChild: AnyRef => nonChild
+        case null => null
+      }.toArray
+      if (changed) makeCopy(newArgs) else this
+    }
+
+    val afterChildren = childrenTransform(rule)
+    if (afterChildren fastEquals this) {
+      rule.applyOrElse(this, identity[A])
+    } else {
+      rule.applyOrElse(afterChildren, identity[A])
+    }
+  }
+
+  /**
+    * Runs the given function first on the node and then recursively on all its children.
+    */
+  private[flink] def preOrderVisit(f: A => Unit): Unit = {
+    f(this)
+    children.foreach(_.preOrderVisit(f))
+  }
+
+  /**
+   * Creates a new copy of this expression with new children. This is used during transformation
+   * if children change.
+   */
+  private[flink] def makeCopy(newArgs: Array[AnyRef]): A = {
+    val ctors = getClass.getConstructors.filter(_.getParameterTypes.length > 0)
+    if (ctors.isEmpty) {
+      throw new RuntimeException(s"No valid constructor for ${getClass.getSimpleName}")
+    }
+
+    val defaultCtor = ctors.find { ctor =>
+      if (ctor.getParameterTypes.length != newArgs.length) {
+        false
+      } else if (newArgs.contains(null)) {
+        false
+      } else {
+        val argsClasses: Array[Class[_]] = newArgs.map(_.getClass)
+        TypeInfoCheckUtils.isAssignable(argsClasses, ctor.getParameterTypes)
+      }
+    }.getOrElse(ctors.maxBy(_.getParameterTypes.length))
+
+    try {
+      defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
+    } catch {
+      case e: Throwable =>
+        throw new RuntimeException(
+          s"Fail to copy tree node ${getClass.getName}.", e)
+    }
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
index 2f51b5b..bbea8a8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
@@ -24,9 +24,8 @@ import org.apache.flink.table.catalog.{FunctionCatalog, FunctionLookup}
 import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{AND, CAST, OR}
-import org.apache.flink.table.types.LogicalTypeDataTypeConverter
+import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
-import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.util.Logging
 import org.apache.flink.util.Preconditions
 
@@ -206,7 +205,7 @@ class RexNodeToExpressionConverter(
     Preconditions.checkArgument(inputRef.getIndex < inputNames.length)
     Some(new FieldReferenceExpression(
       inputNames(inputRef.getIndex),
-      TypeConversions.fromLogicalToDataType(FlinkTypeFactory.toLogicalType(inputRef.getType)),
+      fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(inputRef.getType)),
       0,
       inputRef.getIndex
     ))
@@ -283,7 +282,7 @@ class RexNodeToExpressionConverter(
     }
 
     Some(valueLiteral(literalValue,
-      LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(literalType)))
+      fromLogicalTypeToDataType(literalType)))
   }
 
   override def visitCall(rexCall: RexCall): Option[Expression] = {
@@ -302,7 +301,7 @@ class RexNodeToExpressionConverter(
           Option(operands.reduceLeft { (l, r) => unresolvedCall(AND, l, r) })
         case SqlStdOperatorTable.CAST =>
           Option(unresolvedCall(CAST, operands.head,
-            typeLiteral(LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
+            typeLiteral(fromLogicalTypeToDataType(
               FlinkTypeFactory.toLogicalType(rexCall.getType)))))
         case function: SqlFunction =>
           lookupFunction(replace(function.getName), operands)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeInfoCheckUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeInfoCheckUtils.scala
new file mode 100644
index 0000000..d73d9aa
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeInfoCheckUtils.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo._
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, PojoTypeInfo}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo.{INTERVAL_MILLIS, INTERVAL_MONTHS}
+import org.apache.flink.table.validate._
+
+object TypeInfoCheckUtils {
+
+  /**
+    * Checks if type information is an advanced type that can be converted to a
+    * SQL type but NOT vice versa.
+    */
+  def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
+    case _: TimeIndicatorTypeInfo => false
+    case _: BasicTypeInfo[_] => false
+    case _: SqlTimeTypeInfo[_] => false
+    case _: TimeIntervalTypeInfo[_] => false
+    case _ => true
+  }
+
+  /**
+    * Checks if type information is a simple type that can be converted to a
+    * SQL type and vice versa.
+    */
+  def isSimple(dataType: TypeInformation[_]): Boolean = !isAdvanced(dataType)
+
+  def isNumeric(dataType: TypeInformation[_]): Boolean = dataType match {
+    case _: NumericTypeInfo[_] => true
+    case BIG_DEC_TYPE_INFO => true
+    case _ => false
+  }
+
+  def isTemporal(dataType: TypeInformation[_]): Boolean =
+    isTimePoint(dataType) || isTimeInterval(dataType)
+
+  def isTimePoint(dataType: TypeInformation[_]): Boolean =
+    dataType.isInstanceOf[SqlTimeTypeInfo[_]]
+
+  def isTimeInterval(dataType: TypeInformation[_]): Boolean =
+    dataType.isInstanceOf[TimeIntervalTypeInfo[_]]
+
+  def isString(dataType: TypeInformation[_]): Boolean = dataType == STRING_TYPE_INFO
+
+  def isBoolean(dataType: TypeInformation[_]): Boolean = dataType == BOOLEAN_TYPE_INFO
+
+  def isDecimal(dataType: TypeInformation[_]): Boolean = dataType == BIG_DEC_TYPE_INFO
+
+  def isInteger(dataType: TypeInformation[_]): Boolean = dataType == INT_TYPE_INFO
+
+  def isIntegerFamily(dataType: TypeInformation[_]): Boolean =
+    dataType.isInstanceOf[IntegerTypeInfo[_]]
+
+  def isLong(dataType: TypeInformation[_]): Boolean = dataType == LONG_TYPE_INFO
+
+  def isIntervalMonths(dataType: TypeInformation[_]): Boolean = dataType == INTERVAL_MONTHS
+
+  def isIntervalMillis(dataType: TypeInformation[_]): Boolean = dataType == INTERVAL_MILLIS
+
+  def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
+    case _: ObjectArrayTypeInfo[_, _] |
+         _: BasicArrayTypeInfo[_, _] |
+         _: PrimitiveArrayTypeInfo[_]  => true
+    case _ => false
+  }
+
+  def isMap(dataType: TypeInformation[_]): Boolean =
+    dataType.isInstanceOf[MapTypeInfo[_, _]]
+
+  def isComparable(dataType: TypeInformation[_]): Boolean =
+    classOf[Comparable[_]].isAssignableFrom(dataType.getTypeClass) && !isArray(dataType)
+
+  /**
+    * Types that can be easily converted into a string without ambiguity.
+    */
+  def isSimpleStringRepresentation(dataType: TypeInformation[_]): Boolean =
+    isNumeric(dataType) || isString(dataType) || isTemporal(dataType) || isBoolean(dataType)
+
+  def assertNumericExpr(
+      dataType: TypeInformation[_],
+      caller: String)
+  : ValidationResult = dataType match {
+    case _: NumericTypeInfo[_] =>
+      ValidationSuccess
+    case BIG_DEC_TYPE_INFO =>
+      ValidationSuccess
+    case _ =>
+      ValidationFailure(s"$caller requires numeric types, get $dataType here")
+  }
+
+  def assertIntegerFamilyExpr(
+      dataType: TypeInformation[_],
+      caller: String)
+  : ValidationResult = dataType match {
+    case _: IntegerTypeInfo[_] =>
+      ValidationSuccess
+    case _ =>
+      ValidationFailure(s"$caller requires integer types but was '$dataType'.")
+  }
+
+  def assertOrderableExpr(dataType: TypeInformation[_], caller: String): ValidationResult = {
+    if (dataType.isSortKeyType) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"$caller requires orderable types, get $dataType here")
+    }
+  }
+
+  /**
+    * Checks whether a type implements own hashCode() and equals() methods for storing an instance
+    * in Flink's state or performing a keyBy operation.
+    *
+    * @param name name of the operation
+    * @param t type information to be validated
+    */
+  def validateEqualsHashCode(name: String, t: TypeInformation[_]): Unit = t match {
+
+    // make sure that a POJO class is a valid state type
+    case pt: PojoTypeInfo[_] =>
+      // we don't check the types recursively to give a chance of wrapping
+      // proper hashCode/equals methods around an immutable type
+      validateEqualsHashCode(name, pt.getClass)
+    // recursively check composite types
+    case ct: CompositeType[_] =>
+      validateEqualsHashCode(name, t.getTypeClass)
+      // we check recursively for entering Flink types such as tuples and rows
+      for (i <- 0 until ct.getArity) {
+        val subtype = ct.getTypeAt(i)
+        validateEqualsHashCode(name, subtype)
+      }
+    // check other type information only based on the type class
+    case _: TypeInformation[_] =>
+      validateEqualsHashCode(name, t.getTypeClass)
+  }
+
+  /**
+    * Checks whether a class implements own hashCode() and equals() methods for storing an instance
+    * in Flink's state or performing a keyBy operation.
+    *
+    * @param name name of the operation
+    * @param c class to be validated
+    */
+  def validateEqualsHashCode(name: String, c: Class[_]): Unit = {
+
+    // skip primitives
+    if (!c.isPrimitive) {
+      // check the component type of arrays
+      if (c.isArray) {
+        validateEqualsHashCode(name, c.getComponentType)
+      }
+      // check type for methods
+      else {
+        if (c.getMethod("hashCode").getDeclaringClass eq classOf[Object]) {
+          throw new ValidationException(
+            s"Type '${c.getCanonicalName}' cannot be used in a $name operation because it " +
+                s"does not implement a proper hashCode() method.")
+        }
+        if (c.getMethod("equals", classOf[Object]).getDeclaringClass eq classOf[Object]) {
+          throw new ValidationException(
+            s"Type '${c.getCanonicalName}' cannot be used in a $name operation because it " +
+                s"does not implement a proper equals() method.")
+        }
+      }
+    }
+  }
+
+  /**
+    * Checks if a class is a Java primitive wrapper.
+    */
+  def isPrimitiveWrapper(clazz: Class[_]): Boolean = {
+    clazz == classOf[java.lang.Boolean] ||
+        clazz == classOf[java.lang.Byte] ||
+        clazz == classOf[java.lang.Character] ||
+        clazz == classOf[java.lang.Short] ||
+        clazz == classOf[java.lang.Integer] ||
+        clazz == classOf[java.lang.Long] ||
+        clazz == classOf[java.lang.Double] ||
+        clazz == classOf[java.lang.Float]
+  }
+
+  /**
+    * Checks if one class can be assigned to a variable of another class.
+    *
+    * Adopted from o.a.commons.lang.ClassUtils#isAssignable(java.lang.Class[], java.lang.Class[])
+    * but without null checks.
+    */
+  def isAssignable(classArray: Array[Class[_]], toClassArray: Array[Class[_]]): Boolean = {
+    if (classArray.length != toClassArray.length) {
+      return false
+    }
+    var i = 0
+    while (i < classArray.length) {
+      if (!isAssignable(classArray(i), toClassArray(i))) {
+        return false
+      }
+      i += 1
+    }
+    true
+  }
+
+  /**
+    * Checks if one class can be assigned to a variable of another class.
+    *
+    * Adopted from o.a.commons.lang.ClassUtils#isAssignable(java.lang.Class, java.lang.Class) but
+    * without null checks.
+    */
+  def isAssignable(cls: Class[_], toClass: Class[_]): Boolean = {
+    if (cls.equals(toClass)) {
+      return true
+    }
+    if (cls.isPrimitive) {
+      if (!toClass.isPrimitive) {
+        return false
+      }
+      if (java.lang.Integer.TYPE.equals(cls)) {
+        return java.lang.Long.TYPE.equals(toClass) ||
+            java.lang.Float.TYPE.equals(toClass) ||
+            java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Long.TYPE.equals(cls)) {
+        return java.lang.Float.TYPE.equals(toClass) ||
+            java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Boolean.TYPE.equals(cls)) {
+        return false
+      }
+      if (java.lang.Double.TYPE.equals(cls)) {
+        return false
+      }
+      if (java.lang.Float.TYPE.equals(cls)) {
+        return java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Character.TYPE.equals(cls)) {
+        return java.lang.Integer.TYPE.equals(toClass) ||
+            java.lang.Long.TYPE.equals(toClass) ||
+            java.lang.Float.TYPE.equals(toClass) ||
+            java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Short.TYPE.equals(cls)) {
+        return java.lang.Integer.TYPE.equals(toClass) ||
+            java.lang.Long.TYPE.equals(toClass) ||
+            java.lang.Float.TYPE.equals(toClass) ||
+            java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Byte.TYPE.equals(cls)) {
+        return java.lang.Short.TYPE.equals(toClass) ||
+            java.lang.Integer.TYPE.equals(toClass) ||
+            java.lang.Long.TYPE.equals(toClass) ||
+            java.lang.Float.TYPE.equals(toClass) ||
+            java.lang.Double.TYPE.equals(toClass)
+      }
+      // should never get here
+      return false
+    }
+    toClass.isAssignableFrom(cls)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala
new file mode 100644
index 0000000..64a568b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.validate
+
+/**
+  * Represents the result of a validation.
+  */
+sealed trait ValidationResult {
+  def isFailure: Boolean = !isSuccess
+  def isSuccess: Boolean
+
+  /**
+    * Allows constructing a cascade of validation results.
+    * The first failure result will be returned.
+    */
+  def orElse(other: ValidationResult): ValidationResult = {
+    if (isSuccess) {
+      other
+    } else {
+      this
+    }
+  }
+}
+
+/**
+  * Represents the successful result of a validation.
+  */
+object ValidationSuccess extends ValidationResult {
+  val isSuccess: Boolean = true
+}
+
+/**
+  * Represents the failing result of a validation,
+  * with a error message to show the reason of failure.
+  */
+case class ValidationFailure(message: String) extends ValidationResult {
+  val isSuccess: Boolean = false
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/KeywordParseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/KeywordParseTest.scala
new file mode 100644
index 0000000..6ee4b19
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/KeywordParseTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{lookupCall, unresolvedCall, unresolvedRef}
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions
+
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+/**
+  * Tests keyword as suffix.
+  */
+class KeywordParseTest {
+
+  @Test
+  def testKeyword(): Unit = {
+    assertEquals(
+      unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC, unresolvedRef("f0")),
+      ExpressionParser.parseExpression("f0.asc"))
+    assertEquals(
+      unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC, unresolvedRef("f0")),
+      ExpressionParser.parseExpression("f0.asc()"))
+  }
+
+  @Test
+  def testKeywordAsPrefixInFunctionName(): Unit = {
+    assertEquals(
+      lookupCall("ascii", unresolvedRef("f0")),
+      ExpressionParser.parseExpression("f0.ascii()"))
+  }
+
+  @Test
+  def testKeywordAsInfixInFunctionName(): Unit = {
+    assertEquals(
+      lookupCall("iiascii", unresolvedRef("f0")),
+      ExpressionParser.parseExpression("f0.iiascii()"))
+  }
+
+  @Test
+  def testKeywordAsSuffixInFunctionName(): Unit = {
+    assertEquals(
+      lookupCall("iiasc", unresolvedRef("f0")),
+      ExpressionParser.parseExpression("f0.iiasc()"))
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
index e4641af..79311e2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.plan.util
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.catalog.FunctionCatalog
 import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral}
-import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.expressions.utils.Func1
+import org.apache.flink.table.expressions.{EqualTo, Expression, ExpressionBridge, ExpressionParser, GreaterThan, Literal, PlannerExpression, PlannerExpressionConverter, Sum, UnresolvedFieldReference}
 import org.apache.flink.table.functions.AggregateFunctionDefinition
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{DIVIDE, EQUALS, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL, MINUS, NOT, NOT_EQUALS, OR, PLUS, TIMES}
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{EQUALS, GREATER_THAN, LESS_THAN, LESS_THAN_OR_EQUAL}
 import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.functions.utils.ScalarSqlFunction
 import org.apache.flink.table.plan.util.InputTypeBuilder.inputOf
@@ -55,6 +55,11 @@ class RexNodeExtractorTest extends RexNodeTestBase {
 
   private val functionCatalog = new FunctionCatalog("default_catalog", "default_database")
 
+  private val expressionBridge: ExpressionBridge[PlannerExpression] =
+    new ExpressionBridge[PlannerExpression](
+      functionCatalog,
+      PlannerExpressionConverter.INSTANCE)
+
   @Test
   def testExtractRefInputFields(): Unit = {
     val usedFields = RexNodeExtractor.extractRefInputFields(buildExprs())
@@ -215,13 +220,8 @@ class RexNodeExtractorTest extends RexNodeTestBase {
     val builder: RexBuilder = new RexBuilder(typeFactory)
     val expr = buildConditionExpr()
 
-    // id > 6
-    val firstExp = unresolvedCall(GREATER_THAN, unresolvedRef("id"), valueLiteral(6))
-
-    // amount * price < 100
-    val secondExp = unresolvedCall(LESS_THAN,
-      unresolvedCall(TIMES, unresolvedRef("amount"), unresolvedRef("price")),
-      valueLiteral(100))
+    val firstExp = ExpressionParser.parseExpression("id > 6")
+    val secondExp = ExpressionParser.parseExpression("amount * price < 100")
     val expected: Array[Expression] = Array(firstExp, secondExp)
 
     val (convertedExpressions, unconvertedRexNodes) =
@@ -232,7 +232,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
         builder,
         functionCatalog)
 
-    assertExpressionArrayEquals(expected, convertedExpressions)
+    assertPlannerExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)
   }
 
@@ -255,10 +255,8 @@ class RexNodeExtractorTest extends RexNodeTestBase {
         relBuilder,
         functionCatalog)
 
-    // amount >= id
-    val expr = unresolvedCall(GREATER_THAN_OR_EQUAL, unresolvedRef("amount"), unresolvedRef("id"))
-    val expected: Array[Expression] = Array(expr)
-    assertExpressionArrayEquals(expected, convertedExpressions)
+    val expected: Array[Expression] = Array(ExpressionParser.parseExpression("amount >= id"))
+    assertPlannerExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)
   }
 
@@ -307,25 +305,10 @@ class RexNodeExtractorTest extends RexNodeTestBase {
         functionCatalog)
 
     val expected: Array[Expression] = Array(
-      // amount < 100 || price == 100 || price == 200
-      unresolvedCall(OR,
-        unresolvedCall(OR,
-          unresolvedCall(LESS_THAN, unresolvedRef("amount"), valueLiteral(100)),
-          unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(100))),
-        unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(200))
-      ),
-      // id > 100 || price == 100 || price == 200
-      unresolvedCall(OR,
-        unresolvedCall(OR,
-          unresolvedCall(GREATER_THAN, unresolvedRef("id"), valueLiteral(100)),
-          unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(100))),
-        unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(200))
-      ),
-      // not(amount <= id)
-      unresolvedCall(NOT,
-        unresolvedCall(LESS_THAN_OR_EQUAL, unresolvedRef("amount"), unresolvedRef("id")))
-    )
-    assertExpressionArrayEquals(expected, convertedExpressions)
+      ExpressionParser.parseExpression("amount < 100 || price == 100 || price === 200"),
+      ExpressionParser.parseExpression("id > 100 || price == 100 || price === 200"),
+      ExpressionParser.parseExpression("!(amount <= id)"))
+    assertPlannerExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)
   }
 
@@ -362,17 +345,13 @@ class RexNodeExtractorTest extends RexNodeTestBase {
         functionCatalog)
 
     val expected: Array[Expression] = Array(
-      // amount < 100
-      unresolvedCall(LESS_THAN, unresolvedRef("amount"), valueLiteral(100)),
-      // amount <= id
-      unresolvedCall(LESS_THAN_OR_EQUAL, unresolvedRef("amount"), unresolvedRef("id")),
-      // id > 100
-      unresolvedCall(GREATER_THAN, unresolvedRef("id"), valueLiteral(100)),
-      // price === 100
-      unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(100))
+      ExpressionParser.parseExpression("amount < 100"),
+      ExpressionParser.parseExpression("amount <= id"),
+      ExpressionParser.parseExpression("id > 100"),
+      ExpressionParser.parseExpression("price === 100")
     )
 
-    assertExpressionArrayEquals(expected, convertedExpressions)
+    assertPlannerExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)
   }
 
@@ -421,7 +400,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
       unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(200.1))
     )
 
-    assertExpressionArrayEquals(expected, convertedExpressions)
+    assertPlannerExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)
   }
 
@@ -453,22 +432,41 @@ class RexNodeExtractorTest extends RexNodeTestBase {
       relBuilder,
       functionCatalog)
 
-    val expected = Array[Expression](
-      // timestamp_col = '2017-09-10 14:23:01'
-      unresolvedCall(EQUALS, unresolvedRef("timestamp_col"), valueLiteral(
-        new Timestamp(DateTimeUtils.timestampStringToUnixDate("2017-09-10 14:23:01")))
-      ),
-      // date_col = '2017-09-12'
-      unresolvedCall(EQUALS, unresolvedRef("date_col"), valueLiteral(
-        new Date(DateTimeUtils.dateStringToUnixDate("2017-09-12") * DateTimeUtils.MILLIS_PER_DAY))
-      ),
-      // time_col = '14:23:01'
-      unresolvedCall(EQUALS, unresolvedRef("time_col"), valueLiteral(
-        new Time(DateTimeUtils.timeStringToUnixDate("14:23:01").longValue()))
+    val timestamp = new Timestamp(DateTimeUtils.timestampStringToUnixDate("2017-09-10 14:23:01"))
+    val date = new Date(
+      DateTimeUtils.dateStringToUnixDate("2017-09-12") * DateTimeUtils.MILLIS_PER_DAY)
+    val time = new Time(DateTimeUtils.timeStringToUnixDate("14:23:01").longValue())
+
+    {
+      val expected = Array[Expression](
+        // timestamp_col = '2017-09-10 14:23:01'
+        unresolvedCall(EQUALS, unresolvedRef("timestamp_col"), valueLiteral(timestamp)),
+        // date_col = '2017-09-12'
+        unresolvedCall(EQUALS, unresolvedRef("date_col"), valueLiteral(date)),
+        // time_col = '14:23:01'
+        unresolvedCall(EQUALS, unresolvedRef("time_col"), valueLiteral(time))
       )
-    )
 
-    assertExpressionArrayEquals(expected, converted)
+      assertExpressionArrayEquals(expected, converted)
+    }
+
+    {
+      val expected = Array[Expression](
+        EqualTo(
+          UnresolvedFieldReference("timestamp_col"),
+          Literal(timestamp)
+        ),
+        EqualTo(
+          UnresolvedFieldReference("date_col"),
+          Literal(date)
+        ),
+        EqualTo(
+          UnresolvedFieldReference("time_col"),
+          Literal(time)
+        )
+      )
+      assertPlannerExpressionArrayEquals(expected, converted)
+    }
   }
 
   @Test
@@ -522,34 +520,18 @@ class RexNodeExtractorTest extends RexNodeTestBase {
         functionCatalog)
 
     val expected: Array[Expression] = Array(
-      // amount < id
-      unresolvedCall(LESS_THAN, unresolvedRef("amount"), unresolvedRef("id")),
-      // amount <= id
-      unresolvedCall(LESS_THAN_OR_EQUAL, unresolvedRef("amount"), unresolvedRef("id")),
-      // amount <> id
-      unresolvedCall(NOT_EQUALS, unresolvedRef("amount"), unresolvedRef("id")),
-      // amount = id
-      unresolvedCall(EQUALS, unresolvedRef("amount"), unresolvedRef("id")),
-      // amount >= id
-      unresolvedCall(GREATER_THAN_OR_EQUAL, unresolvedRef("amount"), unresolvedRef("id")),
-      // amount > id
-      unresolvedCall(GREATER_THAN, unresolvedRef("amount"), unresolvedRef("id")),
-      // amount + id == 100
-      unresolvedCall(EQUALS,
-        unresolvedCall(PLUS, unresolvedRef("amount"), unresolvedRef("id")), valueLiteral(100)),
-      // amount - id == 100
-      unresolvedCall(EQUALS,
-        unresolvedCall(MINUS, unresolvedRef("amount"), unresolvedRef("id")), valueLiteral(100)),
-      // amount * id == 100
-      unresolvedCall(EQUALS,
-        unresolvedCall(TIMES, unresolvedRef("amount"), unresolvedRef("id")), valueLiteral(100)),
-      // amount / id == 100
-      unresolvedCall(EQUALS,
-        unresolvedCall(DIVIDE, unresolvedRef("amount"), unresolvedRef("id")), valueLiteral(100))
-      // -amount == 100
-      // ExpressionParser.parseExpression("-amount == 100")
+      ExpressionParser.parseExpression("amount < id"),
+      ExpressionParser.parseExpression("amount <= id"),
+      ExpressionParser.parseExpression("amount <> id"),
+      ExpressionParser.parseExpression("amount == id"),
+      ExpressionParser.parseExpression("amount >= id"),
+      ExpressionParser.parseExpression("amount > id"),
+      ExpressionParser.parseExpression("amount + id == 100"),
+      ExpressionParser.parseExpression("amount - id == 100"),
+      ExpressionParser.parseExpression("amount * id == 100"),
+      ExpressionParser.parseExpression("amount / id == 100")
     )
-    assertExpressionArrayEquals(expected, convertedExpressions)
+    assertPlannerExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)
   }
 
@@ -591,17 +573,25 @@ class RexNodeExtractorTest extends RexNodeTestBase {
       relBuilder,
       functionCatalog)
 
-    val expected: Array[Expression] = Array(
-      // sum(amount) > 100
-      unresolvedCall(GREATER_THAN,
-        unresolvedCall(
-          new AggregateFunctionDefinition("sum", new IntSumAggFunction, Types.INT, Types.INT),
-          unresolvedRef("amount")),
-        valueLiteral(100)
+    {
+      val expected: Array[Expression] = Array(
+        // sum(amount) > 100
+        unresolvedCall(GREATER_THAN,
+          unresolvedCall(
+            new AggregateFunctionDefinition("sum", new IntSumAggFunction, Types.INT, Types.INT),
+            unresolvedRef("amount")),
+          valueLiteral(100)
+        )
       )
-    )
-    assertExpressionArrayEquals(expected, convertedExpressions)
-    assertEquals(1, unconvertedRexNodes.length)
+      assertExpressionArrayEquals(expected, convertedExpressions)
+      assertEquals(1, unconvertedRexNodes.length)
+    }
+
+    {
+      val expected: Array[Expression] = Array(
+        GreaterThan(Sum(UnresolvedFieldReference("amount")), Literal(100)))
+      assertPlannerExpressionArrayEquals(expected, convertedExpressions)
+    }
   }
 
   @Test
@@ -649,6 +639,10 @@ class RexNodeExtractorTest extends RexNodeTestBase {
     assertEquals("or(greaterThan(cast(amount, BIGINT), 100), lessThanOrEqual(amount, id))",
       convertedExpressions(2).toString)
     assertEquals(0, unconvertedRexNodes.length)
+
+    assertPlannerExpressionArrayEquals(
+      Array(ExpressionParser.parseExpression("amount <= id")),
+      Array(convertedExpressions(1)))
   }
 
   @Test
@@ -759,4 +753,17 @@ class RexNodeExtractorTest extends RexNodeTestBase {
     }
   }
 
+  private def assertPlannerExpressionArrayEquals(
+      expected: Array[Expression],
+      actual: Array[Expression]): Unit = {
+    // TODO we assume only planner expression as a temporary solution to keep the old interfaces
+    val sortedExpected = expected.map(expressionBridge.bridge).sortBy(e => e.toString)
+    val sortedActual = actual.map(expressionBridge.bridge).sortBy(e => e.toString)
+
+    assertEquals(sortedExpected.length, sortedActual.length)
+    sortedExpected.zip(sortedActual).foreach {
+      case (l, r) => assertEquals(l.toString, r.toString)
+    }
+  }
+
 }