You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/03 06:06:50 UTC
[flink] 02/02: [FLINK-13049][table-planner-blink] Port planner
expressions to blink-planner from flink-planner
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 886b01d8b21ed378cbffa6d01217f960eb308770
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Jul 2 14:27:07 2019 +0800
[FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner
This closes #8942
---
.../expressions/PlannerTypeInferenceUtilImpl.java | 142 ++++
.../table/functions/sql/FlinkSqlOperatorTable.java | 1 +
.../table/api/ExpressionParserException.scala | 60 ++
.../flink/table/calcite/FlinkRelBuilder.scala | 4 +-
.../flink/table/calcite/FlinkTypeFactory.scala | 21 +-
.../flink/table/expressions/ExpressionBridge.scala | 40 +
.../flink/table/expressions/InputTypeSpec.scala | 69 ++
.../table/expressions/PlannerExpression.scala | 98 +++
.../expressions/PlannerExpressionConverter.scala | 836 +++++++++++++++++++++
.../expressions/PlannerExpressionParserImpl.scala | 726 ++++++++++++++++++
.../table/expressions/PlannerExpressionUtils.scala | 68 ++
.../flink/table/expressions/aggregations.scala | 439 +++++++++++
.../flink/table/expressions/arithmetic.scala | 165 ++++
.../org/apache/flink/table/expressions/call.scala | 326 ++++++++
.../org/apache/flink/table/expressions/cast.scala | 59 ++
.../flink/table/expressions/collection.scala | 235 ++++++
.../flink/table/expressions/comparison.scala | 242 ++++++
.../apache/flink/table/expressions/composite.scala | 108 +++
.../flink/table/expressions/fieldExpression.scala | 253 +++++++
.../flink/table/expressions/hashExpressions.scala | 124 +++
.../apache/flink/table/expressions/literals.scala | 139 ++++
.../org/apache/flink/table/expressions/logic.scala | 109 +++
.../flink/table/expressions/mathExpressions.scala | 532 +++++++++++++
.../apache/flink/table/expressions/ordering.scala | 54 ++
.../flink/table/expressions/overOffsets.scala | 54 ++
.../apache/flink/table/expressions/package.scala | 29 +
.../table/expressions/stringExpressions.scala | 585 ++++++++++++++
.../apache/flink/table/expressions/subquery.scala | 95 +++
.../apache/flink/table/expressions/symbols.scala | 134 ++++
.../org/apache/flink/table/expressions/time.scala | 369 +++++++++
.../flink/table/expressions/windowProperties.scala | 67 ++
.../functions/utils/UserDefinedFunctionUtils.scala | 8 +
.../org/apache/flink/table/plan/TreeNode.scala | 115 +++
.../flink/table/plan/util/RexNodeExtractor.scala | 9 +-
.../flink/table/typeutils/TypeInfoCheckUtils.scala | 277 +++++++
.../flink/table/validate/ValidationResult.scala | 53 ++
.../flink/table/expressions/KeywordParseTest.scala | 62 ++
.../table/plan/util/RexNodeExtractorTest.scala | 195 ++---
38 files changed, 6801 insertions(+), 101 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
new file mode 100644
index 0000000..816e783
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeInferenceUtil;
+import org.apache.flink.table.typeutils.TypeCoercion;
+import org.apache.flink.table.validate.ValidationFailure;
+import org.apache.flink.table.validate.ValidationResult;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+import static org.apache.flink.table.util.JavaScalaConversionUtil.toJava;
+
+/**
+ * Implementation of {@link PlannerTypeInferenceUtil}.
+ */
+@Internal
+public final class PlannerTypeInferenceUtilImpl implements PlannerTypeInferenceUtil {
+
+ private static final PlannerExpressionConverter CONVERTER = PlannerExpressionConverter.INSTANCE();
+
+ @Override
+ public TypeInferenceUtil.Result runTypeInference(
+ UnresolvedCallExpression unresolvedCall,
+ List<ResolvedExpression> resolvedArgs) {
+ final PlannerExpression plannerCall = unresolvedCall.accept(CONVERTER);
+
+ if (plannerCall instanceof InputTypeSpec) {
+ return resolveWithCastedAssignment(
+ unresolvedCall,
+ resolvedArgs,
+ toJava(((InputTypeSpec) plannerCall).expectedTypes()),
+ plannerCall.resultType());
+ } else {
+ validateArguments(plannerCall);
+
+ final List<DataType> expectedArgumentTypes = resolvedArgs.stream()
+ .map(ResolvedExpression::getOutputDataType)
+ .collect(Collectors.toList());
+
+ return new TypeInferenceUtil.Result(
+ expectedArgumentTypes,
+ null,
+ fromLegacyInfoToDataType(plannerCall.resultType()));
+ }
+ }
+
+ private TypeInferenceUtil.Result resolveWithCastedAssignment(
+ UnresolvedCallExpression unresolvedCall,
+ List<ResolvedExpression> args,
+ List<TypeInformation<?>> expectedTypes,
+ TypeInformation<?> resultType) {
+
+ final List<PlannerExpression> plannerArgs = unresolvedCall.getChildren()
+ .stream()
+ .map(e -> e.accept(CONVERTER))
+ .collect(Collectors.toList());
+
+ final List<DataType> castedArgs = IntStream.range(0, plannerArgs.size())
+ .mapToObj(idx -> castIfNeeded(
+ args.get(idx),
+ plannerArgs.get(idx),
+ expectedTypes.get(idx)))
+ .collect(Collectors.toList());
+
+ return new TypeInferenceUtil.Result(
+ castedArgs,
+ null,
+ fromLegacyInfoToDataType(resultType));
+ }
+
+ private void validateArguments(PlannerExpression plannerCall) {
+ if (!plannerCall.valid()) {
+ throw new ValidationException(
+ getValidationErrorMessage(plannerCall)
+ .orElse("Unexpected behavior, validation failed but can't get error messages!"));
+ }
+ }
+
+ /**
+ * Return the validation error message of this {@link PlannerExpression} or return the
+ * validation error message of it's children if it passes the validation. Return empty if
+ * all validation succeeded.
+ */
+ private Optional<String> getValidationErrorMessage(PlannerExpression plannerCall) {
+ ValidationResult validationResult = plannerCall.validateInput();
+ if (validationResult instanceof ValidationFailure) {
+ return Optional.of(((ValidationFailure) validationResult).message());
+ } else {
+ for (Expression plannerExpression: plannerCall.getChildren()) {
+ Optional<String> errorMessage = getValidationErrorMessage((PlannerExpression) plannerExpression);
+ if (errorMessage.isPresent()) {
+ return errorMessage;
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ private DataType castIfNeeded(
+ ResolvedExpression child,
+ PlannerExpression plannerChild,
+ TypeInformation<?> expectedType) {
+ TypeInformation<?> actualType = plannerChild.resultType();
+ if (actualType.equals(expectedType)) {
+ return child.getOutputDataType();
+ } else if (TypeCoercion.canSafelyCast(
+ fromTypeInfoToLogicalType(actualType), fromTypeInfoToLogicalType(expectedType))) {
+ return fromLegacyInfoToDataType(expectedType);
+ } else {
+ throw new ValidationException(String.format("Incompatible type of argument: %s Expected: %s",
+ child,
+ expectedType));
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
index f6c8d6a..38ddf18 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
@@ -1098,6 +1098,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlFunction RAND_INTEGER = SqlStdOperatorTable.RAND_INTEGER;
public static final SqlFunction TIMESTAMP_ADD = SqlStdOperatorTable.TIMESTAMP_ADD;
public static final SqlFunction TIMESTAMP_DIFF = SqlStdOperatorTable.TIMESTAMP_DIFF;
+ public static final SqlFunction TRUNCATE = SqlStdOperatorTable.TRUNCATE;
// MATCH_RECOGNIZE
public static final SqlFunction FIRST = SqlStdOperatorTable.FIRST;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/ExpressionParserException.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/ExpressionParserException.scala
new file mode 100644
index 0000000..91283bf
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/ExpressionParserException.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+/**
+ * Exception for all errors occurring during expression parsing.
+ */
+case class ExpressionParserException(msg: String) extends RuntimeException(msg)
+
+/**
+ * Exception for unwanted method calling on unresolved expression.
+ */
+case class UnresolvedException(msg: String) extends RuntimeException(msg)
+
+/**
+ * Exception for adding an already existent table
+ *
+ * @param catalog catalog name
+ * @param table table name
+ * @param cause the cause
+ */
+case class TableAlreadyExistException(
+ catalog: String,
+ table: String,
+ cause: Throwable)
+ extends RuntimeException(s"Table $catalog.$table already exists.", cause) {
+
+ def this(catalog: String, table: String) = this(catalog, table, null)
+
+}
+
+/**
+ * Exception for adding an already existent catalog
+ *
+ * @param catalog catalog name
+ * @param cause the cause
+ */
+case class CatalogAlreadyExistException(
+ catalog: String,
+ cause: Throwable)
+ extends RuntimeException(s"Catalog $catalog already exists.", cause) {
+
+ def this(catalog: String) = this(catalog, null)
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
index 8b5cf8a..b560fa5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.calcite
import org.apache.flink.table.calcite.FlinkRelFactories.{ExpandFactory, RankFactory, SinkFactory}
-import org.apache.flink.table.expressions.PlannerWindowProperty
+import org.apache.flink.table.expressions.{PlannerWindowProperty, WindowProperty}
import org.apache.flink.table.operations.QueryOperation
import org.apache.flink.table.plan.QueryOperationConverter
import org.apache.flink.table.runtime.rank.{RankRange, RankType}
@@ -113,6 +113,8 @@ object FlinkRelBuilder {
*/
case class PlannerNamedWindowProperty(name: String, property: PlannerWindowProperty)
+ case class NamedWindowProperty(name: String, property: WindowProperty)
+
def proto(context: Context): RelBuilderFactory = new RelBuilderFactory() {
def create(cluster: RelOptCluster, schema: RelOptSchema): RelBuilder =
new FlinkRelBuilder(context, cluster, schema)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index ce40ae2..59a5502 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -18,12 +18,13 @@
package org.apache.flink.table.calcite
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo
+import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.table.api.{DataTypes, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory.toLogicalType
import org.apache.flink.table.plan.schema.{GenericRelDataType, _}
import org.apache.flink.table.types.logical._
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
import org.apache.flink.types.Nothing
import org.apache.flink.util.Preconditions.checkArgument
@@ -369,6 +370,24 @@ object FlinkTypeFactory {
case _ => false
}
+ @Deprecated
+ def isProctimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+ case ti: TimeIndicatorTypeInfo if !ti.isEventTime => true
+ case _ => false
+ }
+
+ @Deprecated
+ def isRowtimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+ case ti: TimeIndicatorTypeInfo if ti.isEventTime => true
+ case _ => false
+ }
+
+ @Deprecated
+ def isTimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+ case ti: TimeIndicatorTypeInfo => true
+ case _ => false
+ }
+
def toLogicalType(relDataType: RelDataType): LogicalType = {
val logicalType = relDataType.getSqlTypeName match {
case BOOLEAN => new BooleanType()
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExpressionBridge.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExpressionBridge.scala
new file mode 100644
index 0000000..7000bad
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExpressionBridge.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.table.catalog.FunctionLookup
+import org.apache.flink.table.expressions.resolver.LookupCallResolver
+
+/**
+ * Bridges between API [[Expression]]s (for both Java and Scala) and final expression stack.
+ */
+class ExpressionBridge[E <: Expression](
+ functionCatalog: FunctionLookup,
+ finalVisitor: ExpressionVisitor[E]) {
+
+ private val callResolver = new LookupCallResolver(functionCatalog)
+
+ def bridge(expression: Expression): E = {
+ // resolve calls
+ val resolvedExpressionTree = expression.accept(callResolver)
+
+ // convert to final expressions
+ resolvedExpressionTree.accept(finalVisitor)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
new file mode 100644
index 0000000..3506e9a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import scala.collection.mutable
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.validate._
+
+/**
+ * Expressions that have strict data type specification on its inputs.
+ */
+trait InputTypeSpec extends PlannerExpression {
+
+ /**
+ * Input type specification for each child.
+ *
+ * For example, [[Power]] expecting both of the children be of double type should use:
+ * {{{
+ * def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil
+ * }}}
+ *
+ * Inputs that don't match the expected type will be safely casted to a higher type. Therefore,
+ * use the decimal type with caution as all numeric types would be casted to a very
+ * inefficient type.
+ */
+ private[flink] def expectedTypes: Seq[TypeInformation[_]]
+
+ override private[flink] def validateInput(): ValidationResult = {
+ val typeMismatches = mutable.ArrayBuffer.empty[String]
+
+ if(expectedTypes.size != children.size){
+ return ValidationFailure(
+ s"""|$this fails on input type size checking: expected types size[${expectedTypes.size}].
+ |Operands types size[${children.size}].
+ |""".stripMargin)
+ }
+
+ children.zip(expectedTypes).zipWithIndex.foreach { case ((e, tpe), i) =>
+ if (e.resultType != tpe) {
+ typeMismatches += s"expecting $tpe on ${i}th input, get ${e.resultType}"
+ }
+ }
+ if (typeMismatches.isEmpty) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(
+ s"""|$this fails on input type checking: ${typeMismatches.mkString("[", ", ", "]")}.
+ |Operand should be casted to proper type
+ |""".stripMargin)
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpression.scala
new file mode 100644
index 0000000..92efcb5
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpression.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import java.util
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.plan.TreeNode
+import org.apache.flink.table.validate.{ValidationResult, ValidationSuccess}
+
+import _root_.scala.collection.JavaConversions._
+
+abstract class PlannerExpression extends TreeNode[PlannerExpression] with Expression {
+ /**
+ * Returns the [[TypeInformation]] for evaluating this expression.
+ * It is sometimes not available until the expression is valid.
+ */
+ private[flink] def resultType: TypeInformation[_]
+
+ /**
+ * One pass validation of the expression tree in post order.
+ */
+ private[flink] lazy val valid: Boolean = childrenValid && validateInput().isSuccess
+
+ private[flink] def childrenValid: Boolean = children.forall(_.valid)
+
+ /**
+ * Check input data types, inputs number or other properties specified by this expression.
+ * Return `ValidationSuccess` if it pass the check,
+ * or `ValidationFailure` with supplement message explaining the error.
+ * Note: we should only call this method until `childrenValid == true`
+ */
+ private[flink] def validateInput(): ValidationResult = ValidationSuccess
+
+ /**
+ * Convert Expression to its counterpart in Calcite, i.e. RexNode
+ */
+ private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+ throw new UnsupportedOperationException(
+ s"${this.getClass.getName} cannot be transformed to RexNode"
+ )
+
+ private[flink] def checkEquals(other: PlannerExpression): Boolean = {
+ if (this.getClass != other.getClass) {
+ false
+ } else {
+ def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
+ elements1.length == elements2.length && elements1.zip(elements2).forall {
+ case (e1: PlannerExpression, e2: PlannerExpression) => e1.checkEquals(e2)
+ case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2)
+ case (i1, i2) => i1 == i2
+ }
+ }
+ val elements1 = this.productIterator.toSeq
+ val elements2 = other.productIterator.toSeq
+ checkEquality(elements1, elements2)
+ }
+ }
+
+ override def asSummaryString(): String = toString
+
+ override def getChildren: util.List[Expression] = children
+
+ override def accept[R](visitor: ExpressionVisitor[R]): R = visitor.visit(this)
+}
+
+abstract class BinaryExpression extends PlannerExpression {
+ private[flink] def left: PlannerExpression
+ private[flink] def right: PlannerExpression
+ private[flink] def children = Seq(left, right)
+}
+
+abstract class UnaryExpression extends PlannerExpression {
+ private[flink] def child: PlannerExpression
+ private[flink] def children = Seq(child)
+}
+
+abstract class LeafExpression extends PlannerExpression {
+ private[flink] val children = Nil
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
new file mode 100644
index 0000000..d52d6e6a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -0,0 +1,836 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
+import org.apache.flink.table.expressions.{E => PlannerE, UUID => PlannerUUID}
+import org.apache.flink.table.functions._
+import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE}
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
+import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+ * Visitor implementation for converting [[Expression]]s to [[PlannerExpression]]s.
+ */
+class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExpression] {
+
+ override def visit(call: CallExpression): PlannerExpression = {
+ translateCall(call.getFunctionDefinition, call.getChildren.asScala)
+ }
+
+ override def visit(unresolvedCall: UnresolvedCallExpression): PlannerExpression = {
+ translateCall(unresolvedCall.getFunctionDefinition, unresolvedCall.getChildren.asScala)
+ }
+
+ private def translateCall(
+ func: FunctionDefinition,
+ children: Seq[Expression])
+ : PlannerExpression = {
+
+ // special case: requires individual handling of child expressions
+ func match {
+ case CAST =>
+ assert(children.size == 2)
+ return Cast(
+ children.head.accept(this),
+ fromDataTypeToLegacyInfo(
+ children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
+
+ case WINDOW_START =>
+ assert(children.size == 1)
+ val windowReference = translateWindowReference(children.head)
+ return WindowStart(windowReference)
+
+ case WINDOW_END =>
+ assert(children.size == 1)
+ val windowReference = translateWindowReference(children.head)
+ return WindowEnd(windowReference)
+
+ case PROCTIME =>
+ assert(children.size == 1)
+ val windowReference = translateWindowReference(children.head)
+ return ProctimeAttribute(windowReference)
+
+ case ROWTIME =>
+ assert(children.size == 1)
+ val windowReference = translateWindowReference(children.head)
+ return RowtimeAttribute(windowReference)
+
+ case _ =>
+ }
+
+ val args = children.map(_.accept(this))
+
+ func match {
+ case sfd: ScalarFunctionDefinition =>
+ val call = PlannerScalarFunctionCall(
+ sfd.getScalarFunction,
+ args)
+ //it configures underlying state
+ call.validateInput()
+ call
+
+ case tfd: TableFunctionDefinition =>
+ PlannerTableFunctionCall(
+ tfd.toString,
+ tfd.getTableFunction,
+ args,
+ tfd.getResultType)
+
+ case afd: AggregateFunctionDefinition =>
+ AggFunctionCall(
+ afd.getAggregateFunction,
+ afd.getResultTypeInfo,
+ afd.getAccumulatorTypeInfo,
+ args)
+
+ case tafd: TableAggregateFunctionDefinition =>
+ AggFunctionCall(
+ tafd.getTableAggregateFunction,
+ tafd.getResultTypeInfo,
+ tafd.getAccumulatorTypeInfo,
+ args)
+
+ case fd: FunctionDefinition =>
+ fd match {
+
+ case AS =>
+ assert(args.size >= 2)
+ val name = getValue[String](args(1))
+ val extraNames = args
+ .drop(2)
+ .map(e => getValue[String](e))
+ Alias(args.head, name, extraNames)
+
+ case FLATTEN =>
+ assert(args.size == 1)
+ Flattening(args.head)
+
+ case GET =>
+ assert(args.size == 2)
+ val expr = GetCompositeField(args.head, getValue(args.last))
+ //it configures underlying state
+ expr.validateInput()
+ expr
+
+ case AND =>
+ assert(args.size == 2)
+ And(args.head, args.last)
+
+ case OR =>
+ assert(args.size == 2)
+ Or(args.head, args.last)
+
+ case NOT =>
+ assert(args.size == 1)
+ Not(args.head)
+
+ case EQUALS =>
+ assert(args.size == 2)
+ EqualTo(args.head, args.last)
+
+ case GREATER_THAN =>
+ assert(args.size == 2)
+ GreaterThan(args.head, args.last)
+
+ case GREATER_THAN_OR_EQUAL =>
+ assert(args.size == 2)
+ GreaterThanOrEqual(args.head, args.last)
+
+ case LESS_THAN =>
+ assert(args.size == 2)
+ LessThan(args.head, args.last)
+
+ case LESS_THAN_OR_EQUAL =>
+ assert(args.size == 2)
+ LessThanOrEqual(args.head, args.last)
+
+ case NOT_EQUALS =>
+ assert(args.size == 2)
+ NotEqualTo(args.head, args.last)
+
+ case IN =>
+ assert(args.size > 1)
+ In(args.head, args.drop(1))
+
+ case IS_NULL =>
+ assert(args.size == 1)
+ IsNull(args.head)
+
+ case IS_NOT_NULL =>
+ assert(args.size == 1)
+ IsNotNull(args.head)
+
+ case IS_TRUE =>
+ assert(args.size == 1)
+ IsTrue(args.head)
+
+ case IS_FALSE =>
+ assert(args.size == 1)
+ IsFalse(args.head)
+
+ case IS_NOT_TRUE =>
+ assert(args.size == 1)
+ IsNotTrue(args.head)
+
+ case IS_NOT_FALSE =>
+ assert(args.size == 1)
+ IsNotFalse(args.head)
+
+ case IF =>
+ assert(args.size == 3)
+ If(args.head, args(1), args.last)
+
+ case BETWEEN =>
+ assert(args.size == 3)
+ Between(args.head, args(1), args.last)
+
+ case NOT_BETWEEN =>
+ assert(args.size == 3)
+ NotBetween(args.head, args(1), args.last)
+
+ case DISTINCT =>
+ assert(args.size == 1)
+ DistinctAgg(args.head)
+
+ case AVG =>
+ assert(args.size == 1)
+ Avg(args.head)
+
+ case COUNT =>
+ assert(args.size == 1)
+ Count(args.head)
+
+ case MAX =>
+ assert(args.size == 1)
+ Max(args.head)
+
+ case MIN =>
+ assert(args.size == 1)
+ Min(args.head)
+
+ case SUM =>
+ assert(args.size == 1)
+ Sum(args.head)
+
+ case SUM0 =>
+ assert(args.size == 1)
+ Sum0(args.head)
+
+ case STDDEV_POP =>
+ assert(args.size == 1)
+ StddevPop(args.head)
+
+ case STDDEV_SAMP =>
+ assert(args.size == 1)
+ StddevSamp(args.head)
+
+ case VAR_POP =>
+ assert(args.size == 1)
+ VarPop(args.head)
+
+ case VAR_SAMP =>
+ assert(args.size == 1)
+ VarSamp(args.head)
+
+ case COLLECT =>
+ assert(args.size == 1)
+ Collect(args.head)
+
+ case CHAR_LENGTH =>
+ assert(args.size == 1)
+ CharLength(args.head)
+
+ case INIT_CAP =>
+ assert(args.size == 1)
+ InitCap(args.head)
+
+ case LIKE =>
+ assert(args.size == 2)
+ Like(args.head, args.last)
+
+ case LOWER =>
+ assert(args.size == 1)
+ Lower(args.head)
+
+ case SIMILAR =>
+ assert(args.size == 2)
+ Similar(args.head, args.last)
+
+ case SUBSTRING =>
+ assert(args.size == 2 || args.size == 3)
+ if (args.size == 2) {
+ new Substring(args.head, args.last)
+ } else {
+ Substring(args.head, args(1), args.last)
+ }
+
+ case REPLACE =>
+ assert(args.size == 2 || args.size == 3)
+ if (args.size == 2) {
+ new Replace(args.head, args.last)
+ } else {
+ Replace(args.head, args(1), args.last)
+ }
+
+ case TRIM =>
+ assert(args.size == 4)
+ val removeLeading = getValue[Boolean](args.head)
+ val removeTrailing = getValue[Boolean](args(1))
+
+ val trimMode = if (removeLeading && removeTrailing) {
+ PlannerTrimMode.BOTH
+ } else if (removeLeading) {
+ PlannerTrimMode.LEADING
+ } else if (removeTrailing) {
+ PlannerTrimMode.TRAILING
+ } else {
+ throw new TableException("Unsupported trim mode.")
+ }
+ Trim(trimMode, args(2), args(3))
+
+ case UPPER =>
+ assert(args.size == 1)
+ Upper(args.head)
+
+ case POSITION =>
+ assert(args.size == 2)
+ Position(args.head, args.last)
+
+ case OVERLAY =>
+ assert(args.size == 3 || args.size == 4)
+ if (args.size == 3) {
+ new Overlay(args.head, args(1), args.last)
+ } else {
+ Overlay(
+ args.head,
+ args(1),
+ args(2),
+ args.last)
+ }
+
+ case CONCAT =>
+ Concat(args)
+
+ case CONCAT_WS =>
+ assert(args.nonEmpty)
+ ConcatWs(args.head, args.tail)
+
+ case LPAD =>
+ assert(args.size == 3)
+ Lpad(args.head, args(1), args.last)
+
+ case RPAD =>
+ assert(args.size == 3)
+ Rpad(args.head, args(1), args.last)
+
+ case REGEXP_EXTRACT =>
+ assert(args.size == 2 || args.size == 3)
+ if (args.size == 2) {
+ RegexpExtract(args.head, args.last)
+ } else {
+ RegexpExtract(args.head, args(1), args.last)
+ }
+
+ case FROM_BASE64 =>
+ assert(args.size == 1)
+ FromBase64(args.head)
+
+ case TO_BASE64 =>
+ assert(args.size == 1)
+ ToBase64(args.head)
+
+ case BuiltInFunctionDefinitions.UUID =>
+ assert(args.isEmpty)
+ PlannerUUID()
+
+ case LTRIM =>
+ assert(args.size == 1)
+ LTrim(args.head)
+
+ case RTRIM =>
+ assert(args.size == 1)
+ RTrim(args.head)
+
+ case REPEAT =>
+ assert(args.size == 2)
+ Repeat(args.head, args.last)
+
+ case REGEXP_REPLACE =>
+ assert(args.size == 3)
+ RegexpReplace(args.head, args(1), args.last)
+
+ case PLUS =>
+ assert(args.size == 2)
+ Plus(args.head, args.last)
+
+ case MINUS =>
+ assert(args.size == 2)
+ Minus(args.head, args.last)
+
+ case DIVIDE =>
+ assert(args.size == 2)
+ Div(args.head, args.last)
+
+ case TIMES =>
+ assert(args.size == 2)
+ Mul(args.head, args.last)
+
+ case ABS =>
+ assert(args.size == 1)
+ Abs(args.head)
+
+ case CEIL =>
+ assert(args.size == 1 || args.size == 2)
+ if (args.size == 1) {
+ Ceil(args.head)
+ } else {
+ TemporalCeil(args.head, args.last)
+ }
+
+ case EXP =>
+ assert(args.size == 1)
+ Exp(args.head)
+
+ case FLOOR =>
+ assert(args.size == 1 || args.size == 2)
+ if (args.size == 1) {
+ Floor(args.head)
+ } else {
+ TemporalFloor(args.head, args.last)
+ }
+
+ case LOG10 =>
+ assert(args.size == 1)
+ Log10(args.head)
+
+ case LOG2 =>
+ assert(args.size == 1)
+ Log2(args.head)
+
+ case LN =>
+ assert(args.size == 1)
+ Ln(args.head)
+
+ case LOG =>
+ assert(args.size == 1 || args.size == 2)
+ if (args.size == 1) {
+ Log(args.head)
+ } else {
+ Log(args.head, args.last)
+ }
+
+ case POWER =>
+ assert(args.size == 2)
+ Power(args.head, args.last)
+
+ case MOD =>
+ assert(args.size == 2)
+ Mod(args.head, args.last)
+
+ case SQRT =>
+ assert(args.size == 1)
+ Sqrt(args.head)
+
+ case MINUS_PREFIX =>
+ assert(args.size == 1)
+ UnaryMinus(args.head)
+
+ case SIN =>
+ assert(args.size == 1)
+ Sin(args.head)
+
+ case COS =>
+ assert(args.size == 1)
+ Cos(args.head)
+
+ case SINH =>
+ assert(args.size == 1)
+ Sinh(args.head)
+
+ case TAN =>
+ assert(args.size == 1)
+ Tan(args.head)
+
+ case TANH =>
+ assert(args.size == 1)
+ Tanh(args.head)
+
+ case COT =>
+ assert(args.size == 1)
+ Cot(args.head)
+
+ case ASIN =>
+ assert(args.size == 1)
+ Asin(args.head)
+
+ case ACOS =>
+ assert(args.size == 1)
+ Acos(args.head)
+
+ case ATAN =>
+ assert(args.size == 1)
+ Atan(args.head)
+
+ case ATAN2 =>
+ assert(args.size == 2)
+ Atan2(args.head, args.last)
+
+ case COSH =>
+ assert(args.size == 1)
+ Cosh(args.head)
+
+ case DEGREES =>
+ assert(args.size == 1)
+ Degrees(args.head)
+
+ case RADIANS =>
+ assert(args.size == 1)
+ Radians(args.head)
+
+ case SIGN =>
+ assert(args.size == 1)
+ Sign(args.head)
+
+ case ROUND =>
+ assert(args.size == 2)
+ Round(args.head, args.last)
+
+ case PI =>
+ assert(args.isEmpty)
+ Pi()
+
+ case BuiltInFunctionDefinitions.E =>
+ assert(args.isEmpty)
+ PlannerE()
+
+ case RAND =>
+ assert(args.isEmpty || args.size == 1)
+ if (args.isEmpty) {
+ new Rand()
+ } else {
+ Rand(args.head)
+ }
+
+ case RAND_INTEGER =>
+ assert(args.size == 1 || args.size == 2)
+ if (args.size == 1) {
+ new RandInteger(args.head)
+ } else {
+ RandInteger(args.head, args.last)
+ }
+
+ case BIN =>
+ assert(args.size == 1)
+ Bin(args.head)
+
+ case HEX =>
+ assert(args.size == 1)
+ Hex(args.head)
+
+ case TRUNCATE =>
+ assert(args.size == 1 || args.size == 2)
+ if (args.size == 1) {
+ new Truncate(args.head)
+ } else {
+ Truncate(args.head, args.last)
+ }
+
+ case EXTRACT =>
+ assert(args.size == 2)
+ Extract(args.head, args.last)
+
+ case CURRENT_DATE =>
+ assert(args.isEmpty)
+ CurrentDate()
+
+ case CURRENT_TIME =>
+ assert(args.isEmpty)
+ CurrentTime()
+
+ case CURRENT_TIMESTAMP =>
+ assert(args.isEmpty)
+ CurrentTimestamp()
+
+ case LOCAL_TIME =>
+ assert(args.isEmpty)
+ LocalTime()
+
+ case LOCAL_TIMESTAMP =>
+ assert(args.isEmpty)
+ LocalTimestamp()
+
+ case TEMPORAL_OVERLAPS =>
+ assert(args.size == 4)
+ TemporalOverlaps(
+ args.head,
+ args(1),
+ args(2),
+ args.last)
+
+ case DATE_TIME_PLUS =>
+ assert(args.size == 2)
+ Plus(args.head, args.last)
+
+ case DATE_FORMAT =>
+ assert(args.size == 2)
+ DateFormat(args.head, args.last)
+
+ case TIMESTAMP_DIFF =>
+ assert(args.size == 3)
+ TimestampDiff(args.head, args(1), args.last)
+
+ case AT =>
+ assert(args.size == 2)
+ ItemAt(args.head, args.last)
+
+ case CARDINALITY =>
+ assert(args.size == 1)
+ Cardinality(args.head)
+
+ case ARRAY =>
+ ArrayConstructor(args)
+
+ case ARRAY_ELEMENT =>
+ assert(args.size == 1)
+ ArrayElement(args.head)
+
+ case MAP =>
+ MapConstructor(args)
+
+ case ROW =>
+ RowConstructor(args)
+
+ case ORDER_ASC =>
+ assert(args.size == 1)
+ Asc(args.head)
+
+ case ORDER_DESC =>
+ assert(args.size == 1)
+ Desc(args.head)
+
+ case MD5 =>
+ assert(args.size == 1)
+ Md5(args.head)
+
+ case SHA1 =>
+ assert(args.size == 1)
+ Sha1(args.head)
+
+ case SHA224 =>
+ assert(args.size == 1)
+ Sha224(args.head)
+
+ case SHA256 =>
+ assert(args.size == 1)
+ Sha256(args.head)
+
+ case SHA384 =>
+ assert(args.size == 1)
+ Sha384(args.head)
+
+ case SHA512 =>
+ assert(args.size == 1)
+ Sha512(args.head)
+
+ case SHA2 =>
+ assert(args.size == 2)
+ Sha2(args.head, args.last)
+
+ case OVER =>
+ assert(args.size >= 4)
+ OverCall(
+ args.head,
+ args.slice(4, args.size),
+ args(1),
+ args(2),
+ args(3)
+ )
+
+ case UNBOUNDED_RANGE =>
+ assert(args.isEmpty)
+ UnboundedRange()
+
+ case UNBOUNDED_ROW =>
+ assert(args.isEmpty)
+ UnboundedRow()
+
+ case CURRENT_RANGE =>
+ assert(args.isEmpty)
+ CurrentRange()
+
+ case CURRENT_ROW =>
+ assert(args.isEmpty)
+ CurrentRow()
+
+ case _ =>
+ throw new TableException(s"Unsupported function definition: $fd")
+ }
+ }
+ }
+
+ override def visit(literal: ValueLiteralExpression): PlannerExpression = {
+ if (hasRoot(literal.getOutputDataType.getLogicalType, SYMBOL)) {
+ val plannerSymbol = getSymbol(literal.getValueAs(classOf[TableSymbol]).get())
+ return SymbolPlannerExpression(plannerSymbol)
+ }
+
+ val typeInfo = getLiteralTypeInfo(literal)
+ if (literal.isNull) {
+ Null(typeInfo)
+ } else {
+ Literal(
+ literal.getValueAs(typeInfo.getTypeClass).get(),
+ typeInfo)
+ }
+ }
+
+ /**
+ * This method makes the planner more lenient for new data types defined for literals.
+ */
+ private def getLiteralTypeInfo(literal: ValueLiteralExpression): TypeInformation[_] = {
+ val logicalType = literal.getOutputDataType.getLogicalType
+
+ if (hasRoot(logicalType, DECIMAL)) {
+ if (literal.isNull) {
+ return Types.BIG_DEC
+ }
+ val value = literal.getValueAs(classOf[java.math.BigDecimal]).get()
+ if (hasPrecision(logicalType, value.precision()) && hasScale(logicalType, value.scale())) {
+ return Types.BIG_DEC
+ }
+ }
+
+ else if (hasRoot(logicalType, CHAR)) {
+ if (literal.isNull) {
+ return Types.STRING
+ }
+ val value = literal.getValueAs(classOf[java.lang.String]).get()
+ if (hasLength(logicalType, value.length)) {
+ return Types.STRING
+ }
+ }
+
+ else if (hasRoot(logicalType, TIMESTAMP_WITHOUT_TIME_ZONE)) {
+ if (getPrecision(logicalType) <= 3) {
+ return Types.SQL_TIMESTAMP
+ }
+ }
+
+ fromDataTypeToLegacyInfo(literal.getOutputDataType)
+ }
+
+ private def getSymbol(symbol: TableSymbol): PlannerSymbol = symbol match {
+ case TimeIntervalUnit.YEAR => PlannerTimeIntervalUnit.YEAR
+ case TimeIntervalUnit.YEAR_TO_MONTH => PlannerTimeIntervalUnit.YEAR_TO_MONTH
+ case TimeIntervalUnit.QUARTER => PlannerTimeIntervalUnit.QUARTER
+ case TimeIntervalUnit.MONTH => PlannerTimeIntervalUnit.MONTH
+ case TimeIntervalUnit.WEEK => PlannerTimeIntervalUnit.WEEK
+ case TimeIntervalUnit.DAY => PlannerTimeIntervalUnit.DAY
+ case TimeIntervalUnit.DAY_TO_HOUR => PlannerTimeIntervalUnit.DAY_TO_HOUR
+ case TimeIntervalUnit.DAY_TO_MINUTE => PlannerTimeIntervalUnit.DAY_TO_MINUTE
+ case TimeIntervalUnit.DAY_TO_SECOND => PlannerTimeIntervalUnit.DAY_TO_SECOND
+ case TimeIntervalUnit.HOUR => PlannerTimeIntervalUnit.HOUR
+ case TimeIntervalUnit.SECOND => PlannerTimeIntervalUnit.SECOND
+ case TimeIntervalUnit.HOUR_TO_MINUTE => PlannerTimeIntervalUnit.HOUR_TO_MINUTE
+ case TimeIntervalUnit.HOUR_TO_SECOND => PlannerTimeIntervalUnit.HOUR_TO_SECOND
+ case TimeIntervalUnit.MINUTE => PlannerTimeIntervalUnit.MINUTE
+ case TimeIntervalUnit.MINUTE_TO_SECOND => PlannerTimeIntervalUnit.MINUTE_TO_SECOND
+ case TimePointUnit.YEAR => PlannerTimePointUnit.YEAR
+ case TimePointUnit.MONTH => PlannerTimePointUnit.MONTH
+ case TimePointUnit.DAY => PlannerTimePointUnit.DAY
+ case TimePointUnit.HOUR => PlannerTimePointUnit.HOUR
+ case TimePointUnit.MINUTE => PlannerTimePointUnit.MINUTE
+ case TimePointUnit.SECOND => PlannerTimePointUnit.SECOND
+ case TimePointUnit.QUARTER => PlannerTimePointUnit.QUARTER
+ case TimePointUnit.WEEK => PlannerTimePointUnit.WEEK
+ case TimePointUnit.MILLISECOND => PlannerTimePointUnit.MILLISECOND
+ case TimePointUnit.MICROSECOND => PlannerTimePointUnit.MICROSECOND
+
+ case _ =>
+ throw new TableException("Unsupported symbol: " + symbol)
+ }
+
+ override def visit(fieldReference: FieldReferenceExpression): PlannerExpression = {
+ PlannerResolvedFieldReference(
+ fieldReference.getName,
+ fromDataTypeToLegacyInfo(fieldReference.getOutputDataType))
+ }
+
+ override def visit(fieldReference: UnresolvedReferenceExpression)
+ : PlannerExpression = {
+ UnresolvedFieldReference(fieldReference.getName)
+ }
+
+ override def visit(typeLiteral: TypeLiteralExpression): PlannerExpression = {
+ throw new TableException("Unsupported type literal expression: " + typeLiteral)
+ }
+
+ override def visit(tableRef: TableReferenceExpression): PlannerExpression = {
+ TableReference(
+ tableRef.asInstanceOf[TableReferenceExpression].getName,
+ tableRef.asInstanceOf[TableReferenceExpression].getQueryOperation
+ )
+ }
+
+ override def visit(localReference: LocalReferenceExpression): PlannerExpression =
+ throw new TableException(
+ "Local reference should be handled individually by a call: " + localReference)
+
+ override def visit(lookupCall: LookupCallExpression): PlannerExpression =
+ throw new TableException("Unsupported function call: " + lookupCall)
+
+ override def visitNonApiExpression(other: Expression): PlannerExpression = {
+ other match {
+ // already converted planner expressions will pass this visitor without modification
+ case plannerExpression: PlannerExpression => plannerExpression
+
+ case _ =>
+ throw new TableException("Unrecognized expression: " + other)
+ }
+ }
+
+ private def getValue[T](literal: PlannerExpression): T = {
+ literal.asInstanceOf[Literal].value.asInstanceOf[T]
+ }
+
+ private def assert(condition: Boolean): Unit = {
+ if (!condition) {
+ throw new ValidationException("Invalid number of arguments for function.")
+ }
+ }
+
+ private def translateWindowReference(reference: Expression): PlannerExpression = reference match {
+ case expr : LocalReferenceExpression =>
+ WindowReference(expr.getName, Some(fromDataTypeToLegacyInfo(expr.getOutputDataType)))
+ //just because how the datastream is converted to table
+ case expr: UnresolvedReferenceExpression =>
+ UnresolvedFieldReference(expr.getName)
+ case _ =>
+ throw new ValidationException(s"Expected LocalReferenceExpression. Got: $reference")
+ }
+}
+
+object PlannerExpressionConverter {
+ val INSTANCE: PlannerExpressionConverter = new PlannerExpressionConverter
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
new file mode 100644
index 0000000..9f71209
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
@@ -0,0 +1,726 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import _root_.java.math.{BigDecimal => JBigDecimal}
+import _root_.java.util.{List => JList}
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.api._
+import org.apache.flink.table.delegation.PlannerExpressionParser
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+
+import _root_.scala.collection.JavaConversions._
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
+
+/**
+ * The implementation of a [[PlannerExpressionParser]] which parsers expressions inside a String.
+ */
+class PlannerExpressionParserImpl extends PlannerExpressionParser {
+
+ def parseExpression(exprString: String): Expression = {
+ PlannerExpressionParserImpl.parseExpression(exprString)
+ }
+
+ override def parseExpressionList(expression: String): JList[Expression] = {
+ PlannerExpressionParserImpl.parseExpressionList(expression)
+ }
+}
+
+/**
+ * Parser for expressions inside a String. This parses exactly the same expressions that
+ * would be accepted by the Scala Expression DSL.
+ *
+ * See ImplicitExpressionConversions and ImplicitExpressionOperations for the constructs
+ * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL
+ * lazy valined in the above files.
+ */
+object PlannerExpressionParserImpl extends JavaTokenParsers
+ with PackratParsers
+ with PlannerExpressionParser {
+
+ case class Keyword(key: String)
+
+ // Convert the keyword into an case insensitive Parser
+ // The pattern ensures that the keyword is not matched as a prefix, i.e.,
+ // the keyword is not followed by a Java identifier character.
+ implicit def keyword2Parser(kw: Keyword): Parser[String] = {
+ ("""(?i)\Q""" + kw.key + """\E(?![_$\p{javaJavaIdentifierPart}])""").r
+ }
+
+ // Keyword
+ lazy val AS: Keyword = Keyword("as")
+ lazy val CAST: Keyword = Keyword("cast")
+ lazy val ASC: Keyword = Keyword("asc")
+ lazy val DESC: Keyword = Keyword("desc")
+ lazy val NULL: Keyword = Keyword("Null")
+ lazy val NULL_OF: Keyword = Keyword("nullOf")
+ lazy val IF: Keyword = Keyword("?")
+ lazy val TO_DATE: Keyword = Keyword("toDate")
+ lazy val TO_TIME: Keyword = Keyword("toTime")
+ lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
+ lazy val TRIM: Keyword = Keyword("trim")
+ lazy val EXTRACT: Keyword = Keyword("extract")
+ lazy val TIMESTAMP_DIFF: Keyword = Keyword("timestampDiff")
+ lazy val FLOOR: Keyword = Keyword("floor")
+ lazy val CEIL: Keyword = Keyword("ceil")
+ lazy val LOG: Keyword = Keyword("log")
+ lazy val YEARS: Keyword = Keyword("years")
+ lazy val YEAR: Keyword = Keyword("year")
+ lazy val QUARTERS: Keyword = Keyword("quarters")
+ lazy val QUARTER: Keyword = Keyword("quarter")
+ lazy val MONTHS: Keyword = Keyword("months")
+ lazy val MONTH: Keyword = Keyword("month")
+ lazy val WEEKS: Keyword = Keyword("weeks")
+ lazy val WEEK: Keyword = Keyword("week")
+ lazy val DAYS: Keyword = Keyword("days")
+ lazy val DAY: Keyword = Keyword("day")
+ lazy val HOURS: Keyword = Keyword("hours")
+ lazy val HOUR: Keyword = Keyword("hour")
+ lazy val MINUTES: Keyword = Keyword("minutes")
+ lazy val MINUTE: Keyword = Keyword("minute")
+ lazy val SECONDS: Keyword = Keyword("seconds")
+ lazy val SECOND: Keyword = Keyword("second")
+ lazy val MILLIS: Keyword = Keyword("millis")
+ lazy val MILLI: Keyword = Keyword("milli")
+ lazy val ROWS: Keyword = Keyword("rows")
+ lazy val STAR: Keyword = Keyword("*")
+ lazy val GET: Keyword = Keyword("get")
+ lazy val FLATTEN: Keyword = Keyword("flatten")
+ lazy val OVER: Keyword = Keyword("over")
+ lazy val DISTINCT: Keyword = Keyword("distinct")
+ lazy val CURRENT_ROW: Keyword = Keyword("current_row")
+ lazy val CURRENT_RANGE: Keyword = Keyword("current_range")
+ lazy val UNBOUNDED_ROW: Keyword = Keyword("unbounded_row")
+ lazy val UNBOUNDED_RANGE: Keyword = Keyword("unbounded_range")
+ lazy val ROWTIME: Keyword = Keyword("rowtime")
+ lazy val PROCTIME: Keyword = Keyword("proctime")
+ lazy val TRUE: Keyword = Keyword("true")
+ lazy val FALSE: Keyword = Keyword("false")
+ lazy val PRIMITIVE_ARRAY: Keyword = Keyword("PRIMITIVE_ARRAY")
+ lazy val OBJECT_ARRAY: Keyword = Keyword("OBJECT_ARRAY")
+ lazy val MAP: Keyword = Keyword("MAP")
+ lazy val BYTE: Keyword = Keyword("BYTE")
+ lazy val SHORT: Keyword = Keyword("SHORT")
+ lazy val INTERVAL_MONTHS: Keyword = Keyword("INTERVAL_MONTHS")
+ lazy val INTERVAL_MILLIS: Keyword = Keyword("INTERVAL_MILLIS")
+ lazy val INT: Keyword = Keyword("INT")
+ lazy val LONG: Keyword = Keyword("LONG")
+ lazy val FLOAT: Keyword = Keyword("FLOAT")
+ lazy val DOUBLE: Keyword = Keyword("DOUBLE")
+ lazy val BOOLEAN: Keyword = Keyword("BOOLEAN")
+ lazy val STRING: Keyword = Keyword("STRING")
+ lazy val SQL_DATE: Keyword = Keyword("SQL_DATE")
+ lazy val SQL_TIMESTAMP: Keyword = Keyword("SQL_TIMESTAMP")
+ lazy val SQL_TIME: Keyword = Keyword("SQL_TIME")
+ lazy val DECIMAL: Keyword = Keyword("DECIMAL")
+ lazy val TRIM_MODE_LEADING: Keyword = Keyword("LEADING")
+ lazy val TRIM_MODE_TRAILING: Keyword = Keyword("TRAILING")
+ lazy val TRIM_MODE_BOTH: Keyword = Keyword("BOTH")
+ lazy val TO: Keyword = Keyword("TO")
+
+ def functionIdent: PlannerExpressionParserImpl.Parser[String] = super.ident
+
+ // symbols
+
+ lazy val timeIntervalUnit: PackratParser[Expression] = TimeIntervalUnit.values map {
+ unit: TimeIntervalUnit => literal(unit.toString) ^^^ valueLiteral(unit)
+ } reduceLeft(_ | _)
+
+ lazy val timePointUnit: PackratParser[Expression] = TimePointUnit.values map {
+ unit: TimePointUnit => literal(unit.toString) ^^^ valueLiteral(unit)
+ } reduceLeft(_ | _)
+
+ lazy val currentRange: PackratParser[Expression] = CURRENT_RANGE ^^ {
+ _ => unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE)
+ }
+
+ lazy val currentRow: PackratParser[Expression] = CURRENT_ROW ^^ {
+ _ => unresolvedCall(BuiltInFunctionDefinitions.CURRENT_ROW)
+ }
+
+ lazy val unboundedRange: PackratParser[Expression] = UNBOUNDED_RANGE ^^ {
+ _ => unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE)
+ }
+
+ lazy val unboundedRow: PackratParser[Expression] = UNBOUNDED_ROW ^^ {
+ _ => unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_ROW)
+ }
+
+ lazy val overConstant: PackratParser[Expression] =
+ currentRange | currentRow | unboundedRange | unboundedRow
+
+ lazy val trimMode: PackratParser[String] =
+ TRIM_MODE_LEADING | TRIM_MODE_TRAILING | TRIM_MODE_BOTH
+
+ // data types
+
+ lazy val dataType: PackratParser[TypeInformation[_]] =
+ PRIMITIVE_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => Types.PRIMITIVE_ARRAY(ct) } |
+ OBJECT_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => Types.OBJECT_ARRAY(ct) } |
+ MAP ~ "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ { mt => Types.MAP(mt._1._1, mt._2)} |
+ BYTE ^^ { e => Types.BYTE } |
+ SHORT ^^ { e => Types.SHORT } |
+ INTERVAL_MONTHS ^^ { e => Types.INTERVAL_MONTHS } |
+ INTERVAL_MILLIS ^^ { e => Types.INTERVAL_MILLIS } |
+ INT ^^ { e => Types.INT } |
+ LONG ^^ { e => Types.LONG } |
+ FLOAT ^^ { e => Types.FLOAT } |
+ DOUBLE ^^ { e => Types.DOUBLE } |
+ BOOLEAN ^^ { { e => Types.BOOLEAN } } |
+ STRING ^^ { e => Types.STRING } |
+ SQL_DATE ^^ { e => Types.SQL_DATE } |
+ SQL_TIMESTAMP ^^ { e => Types.SQL_TIMESTAMP } |
+ SQL_TIME ^^ { e => Types.SQL_TIME } |
+ DECIMAL ^^ { e => Types.DECIMAL }
+
+ // literals
+
+ // same as floatingPointNumber but we do not allow trailing dot "12.d" or "2."
+ lazy val floatingPointNumberFlink: Parser[String] =
+ """-?(\d+(\.\d+)?|\d*\.\d+)([eE][+-]?\d+)?[fFdD]?""".r
+
+ lazy val numberLiteral: PackratParser[Expression] =
+ (wholeNumber <~ ("l" | "L")) ^^ { n => valueLiteral(n.toLong) } |
+ (decimalNumber <~ ("p" | "P")) ^^ { n => valueLiteral(new JBigDecimal(n)) } |
+ (floatingPointNumberFlink | decimalNumber) ^^ {
+ n =>
+ if (n.matches("""-?\d+""")) {
+ valueLiteral(n.toInt)
+ } else if (n.endsWith("f") || n.endsWith("F")) {
+ valueLiteral(n.toFloat)
+ } else {
+ valueLiteral(n.toDouble)
+ }
+ }
+
+ // string with single quotes such as 'It''s me.'
+ lazy val singleQuoteStringLiteral: Parser[Expression] = "'(?:''|[^'])*'".r ^^ {
+ str =>
+ val escaped = str.substring(1, str.length - 1).replace("''", "'")
+ valueLiteral(escaped)
+ }
+
+ // string with double quotes such as "I ""like"" dogs."
+ lazy val doubleQuoteStringLiteral: PackratParser[Expression] = "\"(?:\"\"|[^\"])*\"".r ^^ {
+ str =>
+ val escaped = str.substring(1, str.length - 1).replace("\"\"", "\"")
+ valueLiteral(escaped)
+ }
+
+ lazy val boolLiteral: PackratParser[Expression] = (TRUE | FALSE) ^^ {
+ str => valueLiteral(str.toBoolean)
+ }
+
+ lazy val nullLiteral: PackratParser[Expression] = (NULL | NULL_OF) ~ "(" ~> dataType <~ ")" ^^ {
+ dt => valueLiteral(null, fromLegacyInfoToDataType(dt))
+ }
+
+ lazy val literalExpr: PackratParser[Expression] =
+ numberLiteral | doubleQuoteStringLiteral | singleQuoteStringLiteral | boolLiteral
+
+ lazy val fieldReference: PackratParser[UnresolvedReferenceExpression] = (STAR | ident) ^^ {
+ sym => unresolvedRef(sym)
+ }
+
+ lazy val atom: PackratParser[Expression] =
+ ( "(" ~> expression <~ ")" ) | (fieldReference ||| literalExpr)
+
+ lazy val over: PackratParser[Expression] = composite ~ OVER ~ fieldReference ^^ {
+ case agg ~ _ ~ windowRef =>
+ unresolvedCall(BuiltInFunctionDefinitions.OVER, agg, windowRef)
+ }
+
+ // suffix operators
+
+ lazy val suffixAsc : PackratParser[Expression] = composite <~ "." ~ ASC ~ opt("()") ^^ { e =>
+ unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC, e)
+ }
+
+ lazy val suffixDesc : PackratParser[Expression] = composite <~ "." ~ DESC ~ opt("()") ^^ { e =>
+ unresolvedCall(BuiltInFunctionDefinitions.ORDER_DESC, e)
+ }
+
+ lazy val suffixCast: PackratParser[Expression] =
+ composite ~ "." ~ CAST ~ "(" ~ dataType ~ ")" ^^ {
+ case e ~ _ ~ _ ~ _ ~ dt ~ _ =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.CAST,
+ e,
+ typeLiteral(fromLegacyInfoToDataType(dt)))
+ }
+
+ lazy val suffixTrim: PackratParser[Expression] =
+ composite ~ "." ~ TRIM ~ "(" ~ trimMode ~
+ "," ~ expression ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.TRIM,
+ valueLiteral(mode == TRIM_MODE_LEADING.key || mode == TRIM_MODE_BOTH.key),
+ valueLiteral(mode == TRIM_MODE_TRAILING.key || mode == TRIM_MODE_BOTH.key),
+ trimCharacter,
+ operand)
+ }
+
+ lazy val suffixTrimWithoutArgs: PackratParser[Expression] =
+ composite <~ "." ~ TRIM ~ opt("()") ^^ {
+ e =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.TRIM,
+ valueLiteral(true),
+ valueLiteral(true),
+ valueLiteral(" "),
+ e)
+ }
+
+ lazy val suffixIf: PackratParser[Expression] =
+ composite ~ "." ~ IF ~ "(" ~ expression ~ "," ~ expression ~ ")" ^^ {
+ case condition ~ _ ~ _ ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.IF, condition, ifTrue, ifFalse)
+ }
+
+ lazy val suffixExtract: PackratParser[Expression] =
+ composite ~ "." ~ EXTRACT ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ unit ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.EXTRACT, unit, operand)
+ }
+
+ lazy val suffixFloor: PackratParser[Expression] =
+ composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ unit ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.FLOOR, unit, operand)
+ }
+
+ lazy val suffixCeil: PackratParser[Expression] =
+ composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ unit ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.CEIL, unit, operand)
+ }
+
+ // required because op.log(base) changes order of a parameters
+ lazy val suffixLog: PackratParser[Expression] =
+ composite ~ "." ~ LOG ~ "(" ~ expression ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ base ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.LOG, base, operand)
+ }
+
+ lazy val suffixFunctionCall: PackratParser[Expression] =
+ composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+ case operand ~ _ ~ name ~ _ ~ args ~ _ =>
+ lookupCall(name, operand :: args: _*)
+ }
+
+ lazy val suffixFunctionCallOneArg: PackratParser[Expression] =
+ composite ~ "." ~ functionIdent ^^ {
+ case operand ~ _ ~ name =>
+ lookupCall(name, operand)
+ }
+
+ lazy val suffixToDate: PackratParser[Expression] =
+ composite <~ "." ~ TO_DATE ~ opt("()") ^^ { e =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.CAST,
+ e,
+ typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE)))
+ }
+
+ lazy val suffixToTimestamp: PackratParser[Expression] =
+ composite <~ "." ~ TO_TIMESTAMP ~ opt("()") ^^ { e =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.CAST,
+ e,
+ typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP)))
+ }
+
+ lazy val suffixToTime: PackratParser[Expression] =
+ composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.CAST,
+ e,
+ typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME)))
+ }
+
+ lazy val suffixTimeInterval : PackratParser[Expression] =
+ composite ~ "." ~ (YEARS | QUARTERS | MONTHS | WEEKS | DAYS | HOURS | MINUTES |
+ SECONDS | MILLIS | YEAR | QUARTER | MONTH | WEEK | DAY | HOUR | MINUTE | SECOND | MILLI) ^^ {
+
+ case expr ~ _ ~ (YEARS.key | YEAR.key) => toMonthInterval(expr, 12)
+
+ case expr ~ _ ~ (QUARTERS.key | QUARTER.key) => toMonthInterval(expr, 3)
+
+ case expr ~ _ ~ (MONTHS.key | MONTH.key) => toMonthInterval(expr, 1)
+
+ case expr ~ _ ~ (WEEKS.key | WEEK.key) => toMilliInterval(expr, 7 * MILLIS_PER_DAY)
+
+ case expr ~ _ ~ (DAYS.key | DAY.key) => toMilliInterval(expr, MILLIS_PER_DAY)
+
+ case expr ~ _ ~ (HOURS.key | HOUR.key) => toMilliInterval(expr, MILLIS_PER_HOUR)
+
+ case expr ~ _ ~ (MINUTES.key | MINUTE.key) => toMilliInterval(expr, MILLIS_PER_MINUTE)
+
+ case expr ~ _ ~ (SECONDS.key | SECOND.key) => toMilliInterval(expr, MILLIS_PER_SECOND)
+
+ case expr ~ _ ~ (MILLIS.key | MILLI.key)=> toMilliInterval(expr, 1)
+ }
+
+ lazy val suffixRowInterval : PackratParser[Expression] =
+ composite <~ "." ~ ROWS ^^ { e => toRowInterval(e) }
+
+ lazy val suffixGet: PackratParser[Expression] =
+ composite ~ "." ~ GET ~ "(" ~ literalExpr ~ ")" ^^ {
+ case e ~ _ ~ _ ~ _ ~ index ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.GET, e, index)
+ }
+
+ lazy val suffixFlattening: PackratParser[Expression] =
+ composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e =>
+ unresolvedCall(BuiltInFunctionDefinitions.FLATTEN, e)
+ }
+
+ lazy val suffixDistinct: PackratParser[Expression] =
+ composite <~ "." ~ DISTINCT ~ opt("()") ^^ { e =>
+ unresolvedCall(BuiltInFunctionDefinitions.DISTINCT, e)
+ }
+
+ lazy val suffixAs: PackratParser[Expression] =
+ composite ~ "." ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+ case e ~ _ ~ _ ~ _ ~ names ~ _ =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.AS,
+ e :: names.map(n => valueLiteral(n.getName)): _*)
+ }
+
+ lazy val suffixed: PackratParser[Expression] =
+ // expressions that need to be resolved early
+ suffixFlattening |
+ // expressions that need special expression conversion
+ suffixAs | suffixTimeInterval | suffixRowInterval | suffixToTimestamp | suffixToTime |
+ suffixToDate |
+ // expression for log
+ suffixLog |
+ // expression for ordering
+ suffixAsc | suffixDesc |
+ // expressions that take enumerations
+ suffixCast | suffixTrim | suffixTrimWithoutArgs | suffixExtract | suffixFloor | suffixCeil |
+ // expressions that take literals
+ suffixGet |
+ // expression with special identifier
+ suffixIf |
+ // expression with distinct suffix modifier
+ suffixDistinct |
+ // function call must always be at the end
+ suffixFunctionCall | suffixFunctionCallOneArg |
+ // rowtime or proctime
+ timeIndicator
+
+ // prefix operators
+
+ lazy val prefixCast: PackratParser[Expression] =
+ CAST ~ "(" ~ expression ~ "," ~ dataType ~ ")" ^^ {
+ case _ ~ _ ~ e ~ _ ~ dt ~ _ =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.CAST,
+ e,
+ typeLiteral(fromLegacyInfoToDataType(dt)))
+ }
+
+ lazy val prefixIf: PackratParser[Expression] =
+ IF ~ "(" ~ expression ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+ case _ ~ _ ~ condition ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.IF, condition, ifTrue, ifFalse)
+ }
+
+ lazy val prefixFunctionCall: PackratParser[Expression] =
+ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+ case name ~ _ ~ args ~ _ =>
+ lookupCall(name, args: _*)
+ }
+
+ lazy val prefixFunctionCallOneArg: PackratParser[Expression] =
+ functionIdent ~ "(" ~ expression ~ ")" ^^ {
+ case name ~ _ ~ arg ~ _ =>
+ lookupCall(name, arg)
+ }
+
+ lazy val prefixTrim: PackratParser[Expression] =
+ TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+ case _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ ~ operand ~ _ =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.TRIM,
+ valueLiteral(mode == TRIM_MODE_LEADING.key || mode == TRIM_MODE_BOTH.key),
+ valueLiteral(mode == TRIM_MODE_TRAILING.key || mode == TRIM_MODE_BOTH.key),
+ trimCharacter,
+ operand)
+ }
+
+ lazy val prefixTrimWithoutArgs: PackratParser[Expression] =
+ TRIM ~ "(" ~ expression ~ ")" ^^ {
+ case _ ~ _ ~ operand ~ _ =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.TRIM,
+ valueLiteral(true),
+ valueLiteral(true),
+ valueLiteral(" "),
+ operand)
+ }
+
+ lazy val prefixExtract: PackratParser[Expression] =
+ EXTRACT ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+ case _ ~ _ ~ operand ~ _ ~ unit ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.EXTRACT, unit, operand)
+ }
+
+ lazy val prefixTimestampDiff: PackratParser[Expression] =
+ TIMESTAMP_DIFF ~ "(" ~ timePointUnit ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+ case _ ~ _ ~ unit ~ _ ~ operand1 ~ _ ~ operand2 ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.TIMESTAMP_DIFF, unit, operand1, operand2)
+ }
+
+ lazy val prefixFloor: PackratParser[Expression] =
+ FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+ case _ ~ _ ~ operand ~ _ ~ unit ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.FLOOR, unit, operand)
+ }
+
+ lazy val prefixCeil: PackratParser[Expression] =
+ CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+ case _ ~ _ ~ operand ~ _ ~ unit ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.CEIL, unit, operand)
+ }
+
+ lazy val prefixGet: PackratParser[Expression] =
+ GET ~ "(" ~ composite ~ "," ~ literalExpr ~ ")" ^^ {
+ case _ ~ _ ~ e ~ _ ~ index ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.GET, e, index)
+ }
+
+ lazy val prefixFlattening: PackratParser[Expression] =
+ FLATTEN ~ "(" ~> composite <~ ")" ^^ { e =>
+ unresolvedCall(BuiltInFunctionDefinitions.FLATTEN, e)
+ }
+
+ lazy val prefixToDate: PackratParser[Expression] =
+ TO_DATE ~ "(" ~> expression <~ ")" ^^ { e =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.CAST,
+ e,
+ typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE)))
+ }
+
+ lazy val prefixToTimestamp: PackratParser[Expression] =
+ TO_TIMESTAMP ~ "(" ~> expression <~ ")" ^^ { e =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.CAST,
+ e,
+ typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP)))
+ }
+
+ lazy val prefixToTime: PackratParser[Expression] =
+ TO_TIME ~ "(" ~> expression <~ ")" ^^ { e =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.CAST,
+ e,
+ typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME)))
+ }
+
+ lazy val prefixDistinct: PackratParser[Expression] =
+ functionIdent ~ "." ~ DISTINCT ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+ case name ~ _ ~ _ ~ _ ~ args ~ _ =>
+ unresolvedCall(BuiltInFunctionDefinitions.DISTINCT, lookupCall(name, args: _*))
+ }
+
+ lazy val prefixAs: PackratParser[Expression] =
+ AS ~ "(" ~ expression ~ "," ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+ case _ ~ _ ~ e ~ _ ~ names ~ _ =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.AS,
+ e :: names.map(n => valueLiteral(n.getName)): _*)
+ }
+
+ lazy val prefixed: PackratParser[Expression] =
+ // expressions that need to be resolved early
+ prefixFlattening |
+ // expressions that need special expression conversion
+ prefixAs| prefixToTimestamp | prefixToTime | prefixToDate |
+ // expressions that take enumerations
+ prefixCast | prefixTrim | prefixTrimWithoutArgs | prefixExtract | prefixFloor | prefixCeil |
+ prefixTimestampDiff |
+ // expressions that take literals
+ prefixGet |
+ // expression with special identifier
+ prefixIf |
+ // expression with prefix distinct
+ prefixDistinct |
+ // function call must always be at the end
+ prefixFunctionCall | prefixFunctionCallOneArg
+
+ // suffix/prefix composite
+
+ lazy val composite: PackratParser[Expression] =
+ over | suffixed | nullLiteral | prefixed | atom |
+ failure("Composite expression expected.")
+
+ // unary ops
+
+ lazy val unaryNot: PackratParser[Expression] = "!" ~> composite ^^ { e =>
+ unresolvedCall(BuiltInFunctionDefinitions.NOT, e)
+ }
+
+ lazy val unaryMinus: PackratParser[Expression] = "-" ~> composite ^^ { e =>
+ unresolvedCall(BuiltInFunctionDefinitions.MINUS_PREFIX, e)
+ }
+
+ lazy val unaryPlus: PackratParser[Expression] = "+" ~> composite ^^ { e => e }
+
+ lazy val unary: PackratParser[Expression] = composite | unaryNot | unaryMinus | unaryPlus |
+ failure("Unary expression expected.")
+
+ // arithmetic
+
+ lazy val product: PackratParser[Expression] = unary * (
+ "*" ^^^ {
+ (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.TIMES, a, b)
+ } | "/" ^^^ {
+ (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.DIVIDE, a, b)
+ } | "%" ^^^ {
+ (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.MOD, a, b)
+ }) | failure("Product expected.")
+
+ lazy val term: PackratParser[Expression] = product * (
+ "+" ^^^ {
+ (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.PLUS, a, b)
+ } | "-" ^^^ {
+ (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.MINUS, a, b)
+ }) | failure("Term expected.")
+
+ // comparison
+
+ lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "==" | "=") ~ term ^^ {
+ case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.EQUALS, l, r)
+ }
+
+ lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | "<>") ~ term ^^ {
+ case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.NOT_EQUALS, l, r)
+ }
+
+ lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
+ case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.GREATER_THAN, l, r)
+ }
+
+ lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ {
+ case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, l, r)
+ }
+
+ lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
+ case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.LESS_THAN, l, r)
+ }
+
+ lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
+ case l ~ _ ~ r => unresolvedCall(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, l, r)
+ }
+
+ lazy val comparison: PackratParser[Expression] =
+ equalTo | notEqualTo |
+ greaterThan | greaterThanOrEqual |
+ lessThan | lessThanOrEqual | term |
+ failure("Comparison expected.")
+
+ // logic
+
+ lazy val logic: PackratParser[Expression] = comparison * (
+ "&&" ^^^ {
+ (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.AND, a, b)
+ } | "||" ^^^ {
+ (a:Expression, b:Expression) => unresolvedCall(BuiltInFunctionDefinitions.OR, a, b)
+ }) | failure("Logic expected.")
+
+ // time indicators
+
+ lazy val timeIndicator: PackratParser[Expression] = proctime | rowtime
+
+ lazy val proctime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ {
+ case f ~ _ ~ _ => unresolvedCall(BuiltInFunctionDefinitions.PROCTIME, f)
+ }
+
+ lazy val rowtime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ {
+ case f ~ _ ~ _ => unresolvedCall(BuiltInFunctionDefinitions.ROWTIME, f)
+ }
+
+ // alias
+
+ lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
+ case e ~ _ ~ name =>
+ unresolvedCall(BuiltInFunctionDefinitions.AS, e, valueLiteral(name.getName))
+ } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+ case e ~ _ ~ _ ~ names ~ _ =>
+ unresolvedCall(
+ BuiltInFunctionDefinitions.AS,
+ e :: names.map(n => valueLiteral(n.getName)): _*)
+ } | logic
+
+ lazy val aliasMapping: PackratParser[Expression] =
+ fieldReference ~ AS ~ fieldReference ^^ {
+ case e ~ _ ~ name =>
+ unresolvedCall(BuiltInFunctionDefinitions.AS, e, valueLiteral(name.getName))
+ }
+
+ // columns
+
+ lazy val fieldNameRange: PackratParser[Expression] = fieldReference ~ TO ~ fieldReference ^^ {
+ case start ~ _ ~ end => unresolvedCall(BuiltInFunctionDefinitions.RANGE_TO, start, end)
+ }
+
+ lazy val fieldIndexRange: PackratParser[Expression] = numberLiteral ~ TO ~ numberLiteral ^^ {
+ case start ~ _ ~ end => unresolvedCall(BuiltInFunctionDefinitions.RANGE_TO, start, end)
+ }
+
+ lazy val range = fieldNameRange | fieldIndexRange
+
+ lazy val expression: PackratParser[Expression] = range | overConstant | alias |
+ failure("Invalid expression.")
+
+ lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
+
+ def parseExpressionList(expression: String): JList[Expression] = {
+ parseAll(expressionList, expression) match {
+ case Success(lst, _) => lst
+
+ case NoSuccess(msg, next) =>
+ throwError(msg, next)
+ }
+ }
+
+ def parseExpression(exprString: String): Expression = {
+ parseAll(expression, exprString) match {
+ case Success(lst, _) => lst
+
+ case NoSuccess(msg, next) =>
+ throwError(msg, next)
+ }
+ }
+
+ private def throwError(msg: String, next: Input): Nothing = {
+ val improvedMsg = msg.replace("string matching regex `\\z'", "End of expression")
+
+ throw ExpressionParserException(
+ s"""Could not parse expression at column ${next.pos.column}: $improvedMsg
+ |${next.pos.longString}""".stripMargin)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
new file mode 100644
index 0000000..7f7397f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+object PlannerExpressionUtils {
+
+ private[flink] def isTimeIntervalLiteral(expr: PlannerExpression): Boolean = expr match {
+ case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
+ case _ => false
+ }
+
+ private[flink] def isRowCountLiteral(expr: PlannerExpression): Boolean = expr match {
+ case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) => true
+ case _ => false
+ }
+
+ private[flink] def isTimeAttribute(expr: PlannerExpression): Boolean = expr match {
+ case r: ResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
+ true
+ case _ => false
+ }
+
+ private[flink] def isRowtimeAttribute(expr: PlannerExpression): Boolean = expr match {
+ case r: ResolvedFieldReference
+ if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) =>
+ true
+ case _ => false
+ }
+
+ private[flink] def isProctimeAttribute(expr: PlannerExpression): Boolean = expr match {
+ case r: ResolvedFieldReference
+ if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) =>
+ true
+ case _ => false
+ }
+
+ private[flink] def toTime(expr: PlannerExpression): FlinkTime = expr match {
+ case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+ FlinkTime.milliseconds(value)
+ case _ => throw new IllegalArgumentException()
+ }
+
+ private[flink] def toLong(expr: PlannerExpression): Long = expr match {
+ case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) => value
+ case _ => throw new IllegalArgumentException()
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
new file mode 100644
index 0000000..09174dc
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.MultisetTypeInfo
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.functions.utils.AggSqlFunction
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.functions.{AggregateFunction, FunctionRequirement, TableAggregateFunction, UserDefinedAggregateFunction}
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.fun.SqlSumAggFunction
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.tools.RelBuilder.AggCall
+
+abstract sealed class Aggregation extends PlannerExpression {
+
+ override def toString = s"Aggregate"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+ throw new UnsupportedOperationException("Aggregate cannot be transformed to RexNode")
+
+ /**
+ * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
+ */
+ private[flink] def toAggCall(
+ name: String,
+ isDistinct: Boolean = false
+ )(implicit relBuilder: RelBuilder): AggCall
+
+ /**
+ * Returns the SqlAggFunction for this Aggregation.
+ */
+ private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder): SqlAggFunction
+
+}
+
+case class DistinctAgg(child: PlannerExpression) extends Aggregation {
+
+ def distinct: PlannerExpression = DistinctAgg(child)
+
+ override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+ override private[flink] def validateInput(): ValidationResult = {
+ super.validateInput()
+ child match {
+ case agg: Aggregation =>
+ child.validateInput()
+ case _ =>
+ ValidationFailure(s"Distinct modifier cannot be applied to $child! " +
+ s"It can only be applied to an aggregation expression, for example, " +
+ s"'a.count.distinct which is equivalent with COUNT(DISTINCT a).")
+ }
+ }
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = true)(implicit relBuilder: RelBuilder) = {
+ child.asInstanceOf[Aggregation].toAggCall(name, isDistinct = true)
+ }
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+ child.asInstanceOf[Aggregation].getSqlAggFunction()
+ }
+
+ override private[flink] def children = Seq(child)
+}
+
+case class Sum(child: PlannerExpression) extends Aggregation {
+ override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+ override def toString = s"sum($child)"
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ FlinkSqlOperatorTable.SUM,
+ isDistinct,
+ false,
+ null,
+ name,
+ child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "sum")
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+ val returnType = relBuilder
+ .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ .createFieldTypeFromLogicalType(fromTypeInfoToLogicalType(resultType))
+ new SqlSumAggFunction(returnType)
+ }
+}
+
+case class Sum0(child: PlannerExpression) extends Aggregation {
+ override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+ override def toString = s"sum0($child)"
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ FlinkSqlOperatorTable.SUM0,
+ isDistinct,
+ false,
+ null,
+ name,
+ child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "sum0")
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) =
+ FlinkSqlOperatorTable.SUM0
+}
+
+case class Min(child: PlannerExpression) extends Aggregation {
+ override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+ override def toString = s"min($child)"
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ FlinkSqlOperatorTable.MIN,
+ isDistinct,
+ false,
+ null,
+ name,
+ child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeInfoCheckUtils.assertOrderableExpr(child.resultType, "min")
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+ FlinkSqlOperatorTable.MIN
+ }
+}
+
+case class Max(child: PlannerExpression) extends Aggregation {
+ override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+ override def toString = s"max($child)"
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ FlinkSqlOperatorTable.MAX,
+ isDistinct,
+ false,
+ null,
+ name,
+ child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeInfoCheckUtils.assertOrderableExpr(child.resultType, "max")
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+ FlinkSqlOperatorTable.MAX
+ }
+}
+
+case class Count(child: PlannerExpression) extends Aggregation {
+ override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+ override def toString = s"count($child)"
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ FlinkSqlOperatorTable.COUNT,
+ isDistinct,
+ false,
+ null,
+ name,
+ child.toRexNode)
+ }
+
+ override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+ FlinkSqlOperatorTable.COUNT
+ }
+}
+
+case class Avg(child: PlannerExpression) extends Aggregation {
+ override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+ override def toString = s"avg($child)"
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ FlinkSqlOperatorTable.AVG,
+ isDistinct,
+ false,
+ null,
+ name,
+ child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "avg")
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+ FlinkSqlOperatorTable.AVG
+ }
+}
+
+/**
+ * Returns a multiset aggregates.
+ */
+case class Collect(child: PlannerExpression) extends Aggregation {
+
+ override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+
+ override private[flink] def resultType: TypeInformation[_] =
+ MultisetTypeInfo.getInfoFor(child.resultType)
+
+ override def toString: String = s"collect($child)"
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ FlinkSqlOperatorTable.COLLECT,
+ isDistinct,
+ false,
+ null,
+ name,
+ child.toRexNode)
+ }
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+ FlinkSqlOperatorTable.COLLECT
+ }
+}
+
+case class StddevPop(child: PlannerExpression) extends Aggregation {
+ override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+ override def toString = s"stddev_pop($child)"
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ FlinkSqlOperatorTable.STDDEV_POP,
+ isDistinct,
+ false,
+ null,
+ name,
+ child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "stddev_pop")
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) =
+ FlinkSqlOperatorTable.STDDEV_POP
+}
+
+case class StddevSamp(child: PlannerExpression) extends Aggregation {
+ override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+ override def toString = s"stddev_samp($child)"
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ FlinkSqlOperatorTable.STDDEV_SAMP,
+ isDistinct,
+ false,
+ null,
+ name,
+ child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "stddev_samp")
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) =
+ FlinkSqlOperatorTable.STDDEV_SAMP
+}
+
+case class VarPop(child: PlannerExpression) extends Aggregation {
+ override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+ override def toString = s"var_pop($child)"
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ FlinkSqlOperatorTable.VAR_POP,
+ isDistinct,
+ false,
+ null,
+ name,
+ child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "var_pop")
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) =
+ FlinkSqlOperatorTable.VAR_POP
+}
+
+case class VarSamp(child: PlannerExpression) extends Aggregation {
+ override private[flink] def children: Seq[PlannerExpression] = Seq(child)
+ override def toString = s"var_samp($child)"
+
+ override private[flink] def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ FlinkSqlOperatorTable.VAR_SAMP,
+ isDistinct,
+ false,
+ null,
+ name,
+ child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "var_samp")
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) =
+ FlinkSqlOperatorTable.VAR_SAMP
+}
+
+/**
+ * Expression for calling a user-defined aggregate function.
+ */
+case class AggFunctionCall(
+ val aggregateFunction: UserDefinedAggregateFunction[_, _],
+ resultTypeInfo: TypeInformation[_],
+ accTypeInfo: TypeInformation[_],
+ args: Seq[PlannerExpression])
+ extends Aggregation {
+
+ if (aggregateFunction.isInstanceOf[TableAggregateFunction[_, _]]) {
+ throw new UnsupportedOperationException("TableAggregateFunction is unsupported now.")
+ }
+
+ private val aggFunction = aggregateFunction.asInstanceOf[AggregateFunction[_, _]]
+
+ override private[flink] def children: Seq[PlannerExpression] = args
+
+ override def resultType: TypeInformation[_] = resultTypeInfo
+
+ override def validateInput(): ValidationResult = {
+ val signature = children.map(_.resultType)
+ // look for a signature that matches the input types
+ val foundSignature = getAccumulateMethodSignature(
+ aggFunction,
+ signature.map(fromTypeInfoToLogicalType))
+ if (foundSignature.isEmpty) {
+ ValidationFailure(s"Given parameters do not match any signature. \n" +
+ s"Actual: ${
+ signatureToString(signature.map(fromLegacyInfoToDataType))} \n" +
+ s"Expected: ${
+ getMethodSignatures(aggregateFunction, "accumulate")
+ .map(_.drop(1))
+ .map(signatureToString)
+ .sorted // make sure order to verify error messages in tests
+ .mkString(", ")}")
+ } else {
+ ValidationSuccess
+ }
+ }
+
+ override def toString: String = s"${aggregateFunction.getClass.getSimpleName}($args)"
+
+ override def toAggCall(
+ name: String, isDistinct: Boolean = false)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(
+ this.getSqlAggFunction(),
+ isDistinct,
+ false,
+ null,
+ name,
+ args.map(_.toRexNode): _*)
+ }
+
+ override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
+ val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+ val requiresOver = aggregateFunction match {
+ case a: AggregateFunction[_, _] =>
+ a.getRequirements.contains(FunctionRequirement.OVER_WINDOW_ONLY)
+ case _ => false
+ }
+
+ AggSqlFunction(
+ aggregateFunction.functionIdentifier,
+ aggregateFunction.toString,
+ aggFunction,
+ fromLegacyInfoToDataType(resultType),
+ fromLegacyInfoToDataType(accTypeInfo),
+ typeFactory,
+ requiresOver)
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(this.getSqlAggFunction(), args.map(_.toRexNode): _*)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
new file mode 100644
index 0000000..49212b4
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.{fromLogicalTypeToTypeInfo, fromTypeInfoToLogicalType}
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils._
+import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+abstract class BinaryArithmetic extends BinaryExpression {
+ private[flink] def sqlOperator: SqlOperator
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(sqlOperator, children.map(_.toRexNode))
+ }
+
+ override private[flink] def resultType: TypeInformation[_] =
+ TypeCoercion.widerTypeOf(
+ fromTypeInfoToLogicalType(left.resultType),
+ fromTypeInfoToLogicalType(right.resultType)) match {
+ case Some(t) => fromLogicalTypeToTypeInfo(t)
+ case None =>
+ throw new RuntimeException("This should never happen.")
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (!isNumeric(left.resultType) || !isNumeric(right.resultType)) {
+ ValidationFailure(s"The arithmetic '$this' requires both operands to be numeric, but was " +
+ s"'$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
+ } else {
+ ValidationSuccess
+ }
+ }
+}
+
+case class Plus(left: PlannerExpression, right: PlannerExpression) extends BinaryArithmetic {
+ override def toString = s"($left + $right)"
+
+ private[flink] val sqlOperator = FlinkSqlOperatorTable.PLUS
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ if(isString(left.resultType)) {
+ val castedRight = Cast(right, BasicTypeInfo.STRING_TYPE_INFO)
+ relBuilder.call(FlinkSqlOperatorTable.CONCAT, left.toRexNode, castedRight.toRexNode)
+ } else if(isString(right.resultType)) {
+ val castedLeft = Cast(left, BasicTypeInfo.STRING_TYPE_INFO)
+ relBuilder.call(FlinkSqlOperatorTable.CONCAT, castedLeft.toRexNode, right.toRexNode)
+ } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+ relBuilder.call(FlinkSqlOperatorTable.PLUS, left.toRexNode, right.toRexNode)
+ } else if (isTimeInterval(left.resultType) && isTemporal(right.resultType)) {
+ // Calcite has a bug that can't apply INTERVAL + DATETIME (INTERVAL at left)
+ // we manually switch them here
+ relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, right.toRexNode, left.toRexNode)
+ } else if (isTemporal(left.resultType) && isTemporal(right.resultType)) {
+ relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, left.toRexNode, right.toRexNode)
+ } else {
+ val castedLeft = Cast(left, resultType)
+ val castedRight = Cast(right, resultType)
+ relBuilder.call(FlinkSqlOperatorTable.PLUS, castedLeft.toRexNode, castedRight.toRexNode)
+ }
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (isString(left.resultType) || isString(right.resultType)) {
+ ValidationSuccess
+ } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+ ValidationSuccess
+ } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
+ ValidationSuccess
+ } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
+ ValidationSuccess
+ } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(
+ s"The arithmetic '$this' requires input that is numeric, string, time intervals of the " +
+ s"same type, or a time interval and a time point type, " +
+ s"but was '$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
+ }
+ }
+}
+
+case class UnaryMinus(child: PlannerExpression) extends UnaryExpression {
+ override def toString = s"-($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.UNARY_MINUS, child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (isNumeric(child.resultType)) {
+ ValidationSuccess
+ } else if (isTimeInterval(child.resultType)) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"The arithmetic '$this' requires input that is numeric or a time " +
+ s"interval type, but was '${child.resultType}'.")
+ }
+ }
+}
+
+case class Minus(left: PlannerExpression, right: PlannerExpression) extends BinaryArithmetic {
+ override def toString = s"($left - $right)"
+
+ private[flink] val sqlOperator = FlinkSqlOperatorTable.MINUS
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+ ValidationSuccess
+ } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
+ ValidationSuccess
+ } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
+ ValidationSuccess
+ } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(
+ s"The arithmetic '$this' requires inputs that are numeric, time intervals of the same " +
+ s"type, or a time interval and a time point type, " +
+ s"but was '$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
+ }
+ }
+}
+
+case class Div(left: PlannerExpression, right: PlannerExpression) extends BinaryArithmetic {
+ override def toString = s"($left / $right)"
+
+ private[flink] val sqlOperator = FlinkSqlOperatorTable.DIVIDE
+}
+
+case class Mul(left: PlannerExpression, right: PlannerExpression) extends BinaryArithmetic {
+ override def toString = s"($left * $right)"
+
+ private[flink] val sqlOperator = FlinkSqlOperatorTable.MULTIPLY
+}
+
+case class Mod(left: PlannerExpression, right: PlannerExpression) extends BinaryArithmetic {
+ override def toString = s"($left % $right)"
+
+ private[flink] val sqlOperator = FlinkSqlOperatorTable.MOD
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
new file mode 100644
index 0000000..b9b0b3f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import java.util
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.rex.RexWindowBound._
+import org.apache.calcite.rex.{RexFieldCollation, RexNode, RexWindowBound}
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`.OrdinalReturnTypeInference
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.api._
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions._
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
+import org.apache.flink.table.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.types.logical.LogicalType
+import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+ * Over call with unresolved alias for over window.
+ *
+ * @param agg The aggregation of the over call.
+ * @param alias The alias of the referenced over window.
+ */
+case class UnresolvedOverCall(agg: PlannerExpression, alias: PlannerExpression)
+ extends PlannerExpression {
+
+ override private[flink] def validateInput() =
+ ValidationFailure(s"Over window with alias $alias could not be resolved.")
+
+ override private[flink] def resultType = agg.resultType
+
+ override private[flink] def children = Seq()
+}
+
+/**
+ * Over expression for Calcite over transform.
+ *
+ * @param agg over-agg expression
+ * @param partitionBy The fields by which the over window is partitioned
+ * @param orderBy The field by which the over window is sorted
+ * @param preceding The lower bound of the window
+ * @param following The upper bound of the window
+ */
+case class OverCall(
+ agg: PlannerExpression,
+ partitionBy: Seq[PlannerExpression],
+ orderBy: PlannerExpression,
+ preceding: PlannerExpression,
+ following: PlannerExpression) extends PlannerExpression {
+
+ override def toString: String = s"$agg OVER (" +
+ s"PARTITION BY (${partitionBy.mkString(", ")}) " +
+ s"ORDER BY $orderBy " +
+ s"PRECEDING $preceding " +
+ s"FOLLOWING $following)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+
+ val rexBuilder = relBuilder.getRexBuilder
+
+ // assemble aggregation
+ val operator: SqlAggFunction = agg.asInstanceOf[Aggregation].getSqlAggFunction()
+ val aggResultType = relBuilder
+ .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ .createFieldTypeFromLogicalType(fromTypeInfoToLogicalType(agg.resultType))
+
+ // assemble exprs by agg children
+ val aggExprs = agg.asInstanceOf[Aggregation].children.map(_.toRexNode(relBuilder)).asJava
+
+ // assemble order by key
+ val orderKey = new RexFieldCollation(orderBy.toRexNode, Set[SqlKind]().asJava)
+ val orderKeys = ImmutableList.of(orderKey)
+
+ // assemble partition by keys
+ val partitionKeys = partitionBy.map(_.toRexNode).asJava
+
+ // assemble bounds
+ val isPhysical: Boolean = preceding.resultType == BasicTypeInfo.LONG_TYPE_INFO
+
+ val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
+ val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
+
+ // build RexOver
+ rexBuilder.makeOver(
+ aggResultType,
+ operator,
+ aggExprs,
+ partitionKeys,
+ orderKeys,
+ lowerBound,
+ upperBound,
+ isPhysical,
+ true,
+ false,
+ false)
+ }
+
+ private def createBound(
+ relBuilder: RelBuilder,
+ bound: PlannerExpression,
+ sqlKind: SqlKind): RexWindowBound = {
+
+ bound match {
+ case _: UnboundedRow | _: UnboundedRange =>
+ val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
+ create(unbounded, null)
+ case _: CurrentRow | _: CurrentRange =>
+ val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
+ create(currentRow, null)
+ case b: Literal =>
+ val returnType = relBuilder
+ .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ .createFieldTypeFromLogicalType(fromTypeInfoToLogicalType(Types.DECIMAL))
+
+ val sqlOperator = new SqlPostfixOperator(
+ sqlKind.name,
+ sqlKind,
+ 2,
+ new OrdinalReturnTypeInference(0),
+ null,
+ null)
+
+ val operands: Array[SqlNode] = new Array[SqlNode](1)
+ operands(0) = SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)
+
+ val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
+
+ val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+ expressions.add(relBuilder.literal(b.value))
+
+ val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions)
+
+ create(node, rexNode)
+ }
+ }
+
+ override private[flink] def children: Seq[PlannerExpression] =
+ Seq(agg) ++ Seq(orderBy) ++ partitionBy ++ Seq(preceding) ++ Seq(following)
+
+ override private[flink] def resultType = agg.resultType
+
+ override private[flink] def validateInput(): ValidationResult = {
+
+ // check that agg expression is aggregation
+ agg match {
+ case _: Aggregation =>
+ ValidationSuccess
+ case _ =>
+ return ValidationFailure(s"OVER can only be applied on an aggregation.")
+ }
+
+ // check partitionBy expression keys are resolved field reference
+ partitionBy.foreach {
+ case r: ResolvedFieldReference if r.resultType.isKeyType =>
+ ValidationSuccess
+ case r: ResolvedFieldReference =>
+ return ValidationFailure(s"Invalid PartitionBy expression: $r. " +
+ s"Expression must return key type.")
+ case r =>
+ return ValidationFailure(s"Invalid PartitionBy expression: $r. " +
+ s"Expression must be a resolved field reference.")
+ }
+
+ // check preceding is valid
+ preceding match {
+ case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: UnboundedRange =>
+ ValidationSuccess
+ case Literal(v: Long, BasicTypeInfo.LONG_TYPE_INFO) if v > 0 =>
+ ValidationSuccess
+ case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) =>
+ return ValidationFailure("Preceding row interval must be larger than 0.")
+ case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 =>
+ ValidationSuccess
+ case Literal(_, _: TimeIntervalTypeInfo[_]) =>
+ return ValidationFailure("Preceding time interval must be equal or larger than 0.")
+ case Literal(_, _) =>
+ return ValidationFailure("Preceding must be a row interval or time interval literal.")
+ }
+
+ // check following is valid
+ following match {
+ case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: UnboundedRange =>
+ ValidationSuccess
+ case Literal(v: Long, BasicTypeInfo.LONG_TYPE_INFO) if v > 0 =>
+ ValidationSuccess
+ case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) =>
+ return ValidationFailure("Following row interval must be larger than 0.")
+ case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 =>
+ ValidationSuccess
+ case Literal(_, _: TimeIntervalTypeInfo[_]) =>
+ return ValidationFailure("Following time interval must be equal or larger than 0.")
+ case Literal(_, _) =>
+ return ValidationFailure("Following must be a row interval or time interval literal.")
+ }
+
+ // check that preceding and following are of same type
+ (preceding, following) match {
+ case (p: PlannerExpression, f: PlannerExpression) if p.resultType == f.resultType =>
+ ValidationSuccess
+ case _ =>
+ return ValidationFailure("Preceding and following must be of same interval type.")
+ }
+
+ // check time field
+ if (!PlannerExpressionUtils.isTimeAttribute(orderBy)) {
+ return ValidationFailure("Ordering must be defined on a time attribute.")
+ }
+
+ ValidationSuccess
+ }
+}
+
+/**
+ * Expression for calling a user-defined scalar functions.
+ *
+ * @param scalarFunction scalar function to be called (might be overloaded)
+ * @param parameters actual parameters that determine target evaluation method
+ */
+case class PlannerScalarFunctionCall(
+ scalarFunction: ScalarFunction,
+ parameters: Seq[PlannerExpression])
+ extends PlannerExpression {
+
+ private var signature: Array[LogicalType] = _
+
+ override private[flink] def children: Seq[PlannerExpression] = parameters
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ relBuilder.call(
+ createScalarSqlFunction(
+ scalarFunction.functionIdentifier,
+ scalarFunction.toString,
+ scalarFunction,
+ typeFactory),
+ parameters.map(_.toRexNode): _*)
+ }
+
+ override def toString =
+ s"${scalarFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
+
+ override private[flink] def resultType =
+ fromDataTypeToTypeInfo(getResultTypeOfScalarFunction(
+ scalarFunction,
+ Array(),
+ signature))
+
+ override private[flink] def validateInput(): ValidationResult = {
+ signature = children.map(_.resultType).map(fromTypeInfoToLogicalType).toArray
+ // look for a signature that matches the input types
+ val foundSignature = getEvalMethodSignatureOption(scalarFunction, signature)
+ if (foundSignature.isEmpty) {
+ ValidationFailure(s"Given parameters do not match any signature. \n" +
+ s"Actual: ${signatureToString(signature.map(fromLogicalTypeToDataType))} \n" +
+ s"Expected: ${signaturesToString(scalarFunction, "eval")}")
+ } else {
+ ValidationSuccess
+ }
+ }
+}
+
+/**
+ *
+ * Expression for calling a user-defined table function with actual parameters.
+ *
+ * @param functionName function name
+ * @param tableFunction user-defined table function
+ * @param parameters actual parameters of function
+ * @param resultType type information of returned table
+ */
+case class PlannerTableFunctionCall(
+ functionName: String,
+ tableFunction: TableFunction[_],
+ parameters: Seq[PlannerExpression],
+ resultType: TypeInformation[_])
+ extends PlannerExpression {
+
+ override private[flink] def children: Seq[PlannerExpression] = parameters
+
+ override def validateInput(): ValidationResult = {
+ // check if not Scala object
+ UserFunctionsTypeHelper.validateNotSingleton(tableFunction.getClass)
+ // check if class could be instantiated
+ UserFunctionsTypeHelper.validateInstantiation(tableFunction.getClass)
+ // look for a signature that matches the input types
+ val signature = parameters.map(_.resultType).map(fromLegacyInfoToDataType)
+ val foundMethod = getUserDefinedMethod(
+ tableFunction, "eval", signature)
+ if (foundMethod.isEmpty) {
+ ValidationFailure(
+ s"Given parameters of function '$functionName' do not match any signature. \n" +
+ s"Actual: ${signatureToString(signature)} \n" +
+ s"Expected: ${signaturesToString(tableFunction, "eval")}")
+ } else {
+ ValidationSuccess
+ }
+ }
+
+ override def toString =
+ s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/cast.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/cast.scala
new file mode 100644
index 0000000..98f4b2e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/cast.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.validate._
+
+case class Cast(child: PlannerExpression, resultType: TypeInformation[_])
+ extends UnaryExpression {
+
+ override def toString = s"$child.cast($resultType)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ val childRexNode = child.toRexNode
+ relBuilder
+ .getRexBuilder
+ // we use abstract cast here because RelBuilder.cast() has to many side effects
+ .makeAbstractCast(
+ typeFactory.createFieldTypeFromLogicalType(
+ fromTypeInfoToLogicalType(resultType).copy(childRexNode.getType.isNullable)),
+ childRexNode)
+ }
+
+ override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+ val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
+ copy(child, resultType).asInstanceOf[this.type]
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (TypeCoercion.canCast(
+ fromTypeInfoToLogicalType(child.resultType),
+ fromTypeInfoToLogicalType(resultType))) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"Unsupported cast from ${child.resultType} to $resultType")
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/collection.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/collection.scala
new file mode 100644
index 0000000..dcafe7e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/collection.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils.{isArray, isMap}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class RowConstructor(elements: Seq[PlannerExpression]) extends PlannerExpression {
+
+ override private[flink] def children: Seq[PlannerExpression] = elements
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val relDataType = relBuilder
+ .asInstanceOf[FlinkRelBuilder]
+ .getTypeFactory
+ .createFieldTypeFromLogicalType(fromTypeInfoToLogicalType(resultType).copy(false))
+ val values = elements.map(_.toRexNode).toList.asJava
+ relBuilder
+ .getRexBuilder
+ .makeCall(relDataType, FlinkSqlOperatorTable.ROW, values)
+ }
+
+ override def toString = s"row(${elements.mkString(", ")})"
+
+ override private[flink] def resultType: TypeInformation[_] = new RowTypeInfo(
+ elements.map(e => e.resultType):_*
+ )
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (elements.isEmpty) {
+ return ValidationFailure("Empty rows are not supported yet.")
+ }
+ ValidationSuccess
+ }
+}
+
+case class ArrayConstructor(elements: Seq[PlannerExpression]) extends PlannerExpression {
+
+ override private[flink] def children: Seq[PlannerExpression] = elements
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val relDataType = relBuilder
+ .asInstanceOf[FlinkRelBuilder]
+ .getTypeFactory
+ .createFieldTypeFromLogicalType(fromTypeInfoToLogicalType(resultType).copy(false))
+ val values = elements.map(_.toRexNode).toList.asJava
+ relBuilder
+ .getRexBuilder
+ .makeCall(relDataType, FlinkSqlOperatorTable.ARRAY_VALUE_CONSTRUCTOR, values)
+ }
+
+ override def toString = s"array(${elements.mkString(", ")})"
+
+ override private[flink] def resultType = ObjectArrayTypeInfo.getInfoFor(elements.head.resultType)
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (elements.isEmpty) {
+ return ValidationFailure("Empty arrays are not supported yet.")
+ }
+ val elementType = elements.head.resultType
+ if (!elements.forall(_.resultType == elementType)) {
+ ValidationFailure("Not all elements of the array have the same type.")
+ } else {
+ ValidationSuccess
+ }
+ }
+}
+
+case class MapConstructor(elements: Seq[PlannerExpression]) extends PlannerExpression {
+ override private[flink] def children: Seq[PlannerExpression] = elements
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+ val relDataType = typeFactory.createMapType(
+ typeFactory.createFieldTypeFromLogicalType(
+ fromTypeInfoToLogicalType(elements.head.resultType)),
+ typeFactory.createFieldTypeFromLogicalType(
+ fromTypeInfoToLogicalType(elements.last.resultType))
+ )
+ val values = elements.map(_.toRexNode).toList.asJava
+ relBuilder
+ .getRexBuilder
+ .makeCall(relDataType, FlinkSqlOperatorTable.MAP_VALUE_CONSTRUCTOR, values)
+ }
+
+ override def toString = s"map(${elements
+ .grouped(2)
+ .map(x => s"[${x.mkString(": ")}]").mkString(", ")})"
+
+ override private[flink] def resultType: TypeInformation[_] = new MapTypeInfo(
+ elements.head.resultType,
+ elements.last.resultType
+ )
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (elements.isEmpty) {
+ return ValidationFailure("Empty maps are not supported yet.")
+ }
+ if (elements.size % 2 != 0) {
+ return ValidationFailure("Maps must have an even number of elements to form key-value pairs.")
+ }
+ if (!elements.grouped(2).forall(_.head.resultType == elements.head.resultType)) {
+ return ValidationFailure("Not all key elements of the map literal have the same type.")
+ }
+ if (!elements.grouped(2).forall(_.last.resultType == elements.last.resultType)) {
+ return ValidationFailure("Not all value elements of the map literal have the same type.")
+ }
+ ValidationSuccess
+ }
+}
+
+case class ArrayElement(array: PlannerExpression) extends PlannerExpression {
+
+ override private[flink] def children: Seq[PlannerExpression] = Seq(array)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(FlinkSqlOperatorTable.ELEMENT, array.toRexNode)
+ }
+
+ override def toString = s"($array).element()"
+
+ override private[flink] def resultType = array.resultType match {
+ case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+ case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
+ case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ array.resultType match {
+ case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
+ case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+ }
+ }
+}
+
+case class Cardinality(container: PlannerExpression) extends PlannerExpression {
+
+ override private[flink] def children: Seq[PlannerExpression] = Seq(container)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(FlinkSqlOperatorTable.CARDINALITY, container.toRexNode)
+ }
+
+ override def toString = s"($container).cardinality()"
+
+ override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ container.resultType match {
+ case mti: TypeInformation[_] if isMap(mti) => ValidationSuccess
+ case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
+ case other@_ => ValidationFailure(s"Array or map expected but was '$other'.")
+ }
+ }
+}
+
+case class ItemAt(container: PlannerExpression, key: PlannerExpression) extends PlannerExpression {
+
+ override private[flink] def children: Seq[PlannerExpression] = Seq(container, key)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(FlinkSqlOperatorTable.ITEM, container.toRexNode, key.toRexNode)
+ }
+
+ override def toString = s"($container).at($key)"
+
+ override private[flink] def resultType = container.resultType match {
+ case mti: MapTypeInfo[_, _] => mti.getValueTypeInfo
+ case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+ case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
+ case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ container.resultType match {
+
+ case ati: TypeInformation[_] if isArray(ati) =>
+ if (key.resultType == INT_TYPE_INFO) {
+ // check for common user mistake
+ key match {
+ case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
+ ValidationFailure(
+ s"Array element access needs an index starting at 1 but was $value.")
+ case _ => ValidationSuccess
+ }
+ } else {
+ ValidationFailure(
+ s"Array element access needs an integer index but was '${key.resultType}'.")
+ }
+
+ case mti: MapTypeInfo[_, _] =>
+ if (key.resultType == mti.getKeyTypeInfo) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(
+ s"Map entry access needs a valid key of type " +
+ s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.")
+ }
+
+ case other@_ => ValidationFailure(s"Array or map expected but was '$other'.")
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/comparison.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/comparison.scala
new file mode 100644
index 0000000..c1517cc
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/comparison.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils.{isArray, isComparable, isNumeric}
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+abstract class BinaryComparison extends BinaryExpression {
+ private[flink] def sqlOperator: SqlOperator
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(sqlOperator, children.map(_.toRexNode))
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ (left.resultType, right.resultType) match {
+ case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+ case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess
+ case (lType, rType) =>
+ ValidationFailure(
+ s"Comparison is only supported for numeric types and " +
+ s"comparable types of same type, got $lType and $rType")
+ }
+}
+
+case class EqualTo(left: PlannerExpression, right: PlannerExpression) extends BinaryComparison {
+ override def toString = s"$left === $right"
+
+ private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.EQUALS
+
+ override private[flink] def validateInput(): ValidationResult =
+ (left.resultType, right.resultType) match {
+ case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+ case (lType, rType) if lType == rType => ValidationSuccess
+ case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
+ ValidationSuccess
+ case (lType, rType) =>
+ ValidationFailure(s"Equality predicate on incompatible types: $lType and $rType")
+ }
+}
+
+case class NotEqualTo(left: PlannerExpression, right: PlannerExpression) extends BinaryComparison {
+ override def toString = s"$left !== $right"
+
+ private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.NOT_EQUALS
+
+ override private[flink] def validateInput(): ValidationResult =
+ (left.resultType, right.resultType) match {
+ case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+ case (lType, rType) if lType == rType => ValidationSuccess
+ case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
+ ValidationSuccess
+ case (lType, rType) =>
+ ValidationFailure(s"Inequality predicate on incompatible types: $lType and $rType")
+ }
+}
+
+case class GreaterThan(left: PlannerExpression, right: PlannerExpression) extends BinaryComparison {
+ override def toString = s"$left > $right"
+
+ private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.GREATER_THAN
+}
+
+case class GreaterThanOrEqual(left: PlannerExpression, right: PlannerExpression)
+ extends BinaryComparison {
+ override def toString = s"$left >= $right"
+
+ private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL
+}
+
+case class LessThan(left: PlannerExpression, right: PlannerExpression) extends BinaryComparison {
+ override def toString = s"$left < $right"
+
+ private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.LESS_THAN
+}
+
+case class LessThanOrEqual(left: PlannerExpression, right: PlannerExpression)
+ extends BinaryComparison {
+ override def toString = s"$left <= $right"
+
+ private[flink] val sqlOperator: SqlOperator = FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL
+}
+
+case class IsNull(child: PlannerExpression) extends UnaryExpression {
+ override def toString = s"($child).isNull"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.isNull(child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotNull(child: PlannerExpression) extends UnaryExpression {
+ override def toString = s"($child).isNotNull"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.isNotNull(child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsTrue(child: PlannerExpression) extends UnaryExpression {
+ override def toString = s"($child).isTrue"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.IS_TRUE, child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsFalse(child: PlannerExpression) extends UnaryExpression {
+ override def toString = s"($child).isFalse"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.IS_FALSE, child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotTrue(child: PlannerExpression) extends UnaryExpression {
+ override def toString = s"($child).isNotTrue"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.IS_NOT_TRUE, child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotFalse(child: PlannerExpression) extends UnaryExpression {
+ override def toString = s"($child).isNotFalse"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.IS_NOT_FALSE, child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+abstract class BetweenComparison(
+ expr: PlannerExpression,
+ lowerBound: PlannerExpression,
+ upperBound: PlannerExpression)
+ extends PlannerExpression {
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+ override private[flink] def children: Seq[PlannerExpression] = Seq(expr, lowerBound, upperBound)
+
+ override private[flink] def validateInput(): ValidationResult = {
+ (expr.resultType, lowerBound.resultType, upperBound.resultType) match {
+ case (exprType, lowerType, upperType)
+ if isNumeric(exprType) && isNumeric(lowerType) && isNumeric(upperType) =>
+ ValidationSuccess
+ case (exprType, lowerType, upperType)
+ if isComparable(exprType) && exprType == lowerType && exprType == upperType =>
+ ValidationSuccess
+ case (exprType, lowerType, upperType) =>
+ ValidationFailure(
+ s"Between is only supported for numeric types and " +
+ s"identical comparable types, but got $exprType, $lowerType and $upperType"
+ )
+ }
+ }
+}
+
+case class Between(
+ expr: PlannerExpression,
+ lowerBound: PlannerExpression,
+ upperBound: PlannerExpression)
+ extends BetweenComparison(expr, lowerBound, upperBound) {
+
+ override def toString: String = s"($expr).between($lowerBound, $upperBound)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.and(
+ relBuilder.call(
+ FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL,
+ expr.toRexNode,
+ lowerBound.toRexNode
+ ),
+ relBuilder.call(
+ FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL,
+ expr.toRexNode,
+ upperBound.toRexNode
+ )
+ )
+ }
+}
+
+case class NotBetween(
+ expr: PlannerExpression,
+ lowerBound: PlannerExpression,
+ upperBound: PlannerExpression)
+ extends BetweenComparison(expr, lowerBound, upperBound) {
+
+ override def toString: String = s"($expr).notBetween($lowerBound, $upperBound)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.or(
+ relBuilder.call(
+ FlinkSqlOperatorTable.LESS_THAN,
+ expr.toRexNode,
+ lowerBound.toRexNode
+ ),
+ relBuilder.call(
+ FlinkSqlOperatorTable.GREATER_THAN,
+ expr.toRexNode,
+ upperBound.toRexNode
+ )
+ )
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala
new file mode 100644
index 0000000..1f858a1
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.UnresolvedException
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+/**
+ * Flattening of composite types. All flattenings are resolved into
+ * `GetCompositeField` expressions.
+ */
+case class Flattening(child: PlannerExpression) extends UnaryExpression {
+
+ override def toString = s"$child.flatten()"
+
+ override private[flink] def resultType: TypeInformation[_] =
+ throw UnresolvedException(s"Invalcall to on ${this.getClass}.")
+
+ override private[flink] def validateInput(): ValidationResult =
+ ValidationFailure(s"Unresolved flattening of $child")
+}
+
+case class GetCompositeField(child: PlannerExpression, key: Any) extends UnaryExpression {
+
+ private var fieldIndex: Option[Int] = None
+
+ override def toString = s"$child.get($key)"
+
+ override private[flink] def validateInput(): ValidationResult = {
+ // check for composite type
+ if (!child.resultType.isInstanceOf[CompositeType[_]]) {
+ return ValidationFailure(s"Cannot access field of non-composite type '${child.resultType}'.")
+ }
+ val compositeType = child.resultType.asInstanceOf[CompositeType[_]]
+
+ // check key
+ key match {
+ case name: String =>
+ val index = compositeType.getFieldIndex(name)
+ if (index < 0) {
+ ValidationFailure(s"Field name '$name' could not be found.")
+ } else {
+ fieldIndex = Some(index)
+ ValidationSuccess
+ }
+ case index: Int =>
+ if (index >= compositeType.getArity) {
+ ValidationFailure(s"Field index '$index' exceeds arity.")
+ } else {
+ fieldIndex = Some(index)
+ ValidationSuccess
+ }
+ case _ =>
+ ValidationFailure(s"Invalid key '$key'.")
+ }
+ }
+
+ override private[flink] def resultType: TypeInformation[_] =
+ child.resultType.asInstanceOf[CompositeType[_]].getTypeAt(fieldIndex.get)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeFieldAccess(child.toRexNode, fieldIndex.get)
+ }
+
+ override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+ val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
+ copy(child, key).asInstanceOf[this.type]
+ }
+
+ /**
+ * Gives a meaningful alias if possible (e.g. a$mypojo$field).
+ */
+ private[flink] def aliasName(): Option[String] = child match {
+ case gcf: GetCompositeField =>
+ val alias = gcf.aliasName()
+ if (alias.isDefined) {
+ Some(s"${alias.get}$$$key")
+ } else {
+ None
+ }
+ case c: ResolvedFieldReference =>
+ val keySuffix = if (key.isInstanceOf[Int]) s"_$key" else key
+ Some(s"${c.name}$$$keySuffix")
+ case _ => None
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
new file mode 100644
index 0000000..ffd526e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api._
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.calcite.FlinkTypeFactory._
+import org.apache.flink.table.functions.sql.StreamRecordTimestampSqlFunction
+import org.apache.flink.table.operations.QueryOperation
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+trait NamedExpression extends PlannerExpression {
+ private[flink] def name: String
+ private[flink] def toAttribute: Attribute
+}
+
+abstract class Attribute extends LeafExpression with NamedExpression {
+ override private[flink] def toAttribute: Attribute = this
+
+ private[flink] def withName(newName: String): Attribute
+}
+
+/**
+ * Dummy wrapper for expressions that were converted to RexNode in a different way.
+ */
+case class RexPlannerExpression(
+ private[flink] val rexNode: RexNode)
+ extends LeafExpression {
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ rexNode
+ }
+
+ override private[flink] def resultType: TypeInformation[_] =
+ fromLogicalTypeToTypeInfo(FlinkTypeFactory.toLogicalType(rexNode.getType))
+}
+
+case class UnresolvedFieldReference(name: String) extends Attribute {
+
+ override def toString = s"'$name"
+
+ override private[flink] def withName(newName: String): Attribute =
+ UnresolvedFieldReference(newName)
+
+ override private[flink] def resultType: TypeInformation[_] =
+ throw UnresolvedException(s"Calling resultType on ${this.getClass}.")
+
+ override private[flink] def validateInput(): ValidationResult =
+ ValidationFailure(s"Unresolved reference $name.")
+}
+
+case class PlannerResolvedFieldReference(
+ name: String,
+ resultType: TypeInformation[_]) extends Attribute with ResolvedFieldReference {
+
+ override def toString = s"'$name"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.field(name)
+ }
+
+ override private[flink] def withName(newName: String): Attribute = {
+ if (newName == name) {
+ this
+ } else {
+ PlannerResolvedFieldReference(newName, resultType)
+ }
+ }
+}
+
+case class Alias(child: PlannerExpression, name: String, extraNames: Seq[String] = Seq())
+ extends UnaryExpression with NamedExpression {
+
+ override def toString = s"$child as '$name"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.alias(child.toRexNode, name)
+ }
+
+ override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+ override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+ val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
+ copy(child, name, extraNames).asInstanceOf[this.type]
+ }
+
+ override private[flink] def toAttribute: Attribute = {
+ if (valid) {
+ PlannerResolvedFieldReference(name, child.resultType)
+ } else {
+ UnresolvedFieldReference(name)
+ }
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (name == "*") {
+ ValidationFailure("Alias can not accept '*' as name.")
+ } else {
+ ValidationSuccess
+ }
+ }
+}
+
+case class UnresolvedAlias(child: PlannerExpression) extends UnaryExpression with NamedExpression {
+
+ override private[flink] def name: String =
+ throw UnresolvedException("Invalid call to name on UnresolvedAlias")
+
+ override private[flink] def toAttribute: Attribute =
+ throw UnresolvedException("Invalid call to toAttribute on UnresolvedAlias")
+
+ override private[flink] def resultType: TypeInformation[_] =
+ throw UnresolvedException("Invalid call to resultType on UnresolvedAlias")
+
+ override private[flink] lazy val valid = false
+}
+
+case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None) extends Attribute {
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+ throw new UnsupportedOperationException("A window reference can not be used solely.")
+
+ override private[flink] def resultType: TypeInformation[_] =
+ tpe.getOrElse(throw UnresolvedException("Could not resolve type of referenced window."))
+
+ override private[flink] def withName(newName: String): Attribute = {
+ if (newName == name) {
+ this
+ } else {
+ throw new ValidationException("Cannot rename window reference.")
+ }
+ }
+
+ override def toString: String = s"'$name"
+}
+
+case class TableReference(name: String, tableOperation: QueryOperation)
+ extends LeafExpression
+ with NamedExpression {
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+ throw new UnsupportedOperationException(s"Table reference '$name' can not be used solely.")
+
+ override private[flink] def resultType: TypeInformation[_] =
+ throw UnresolvedException(s"Table reference '$name' has no result type.")
+
+ override private[flink] def toAttribute =
+ throw new UnsupportedOperationException(s"A table reference '$name' can not be an attribute.")
+
+ override def toString: String = s"$name"
+}
+
+abstract class TimeAttribute(val expression: PlannerExpression)
+ extends UnaryExpression
+ with WindowProperty {
+
+ override private[flink] def child: PlannerExpression = expression
+}
+
+case class RowtimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr) {
+
+ override private[flink] def validateInput(): ValidationResult = {
+ child match {
+ case WindowReference(_, Some(tpe: TypeInformation[_])) if isProctimeIndicatorType(tpe) =>
+ ValidationFailure("A proctime window cannot provide a rowtime attribute.")
+ case WindowReference(_, Some(tpe: TypeInformation[_])) if isRowtimeIndicatorType(tpe) =>
+ // rowtime window
+ ValidationSuccess
+ case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP =>
+ // batch time window
+ ValidationSuccess
+ case WindowReference(_, _) =>
+ ValidationFailure("Reference to a rowtime or proctime window required.")
+ case any =>
+ ValidationFailure(
+ s"The '.rowtime' expression can only be used for table definitions and windows, " +
+ s"while [$any] was found.")
+ }
+ }
+
+ override def resultType: TypeInformation[_] = {
+ child match {
+ case WindowReference(_, Some(tpe: TypeInformation[_])) if isRowtimeIndicatorType(tpe) =>
+ // rowtime window
+ TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+ case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP =>
+ // batch time window
+ Types.SQL_TIMESTAMP
+ case _ =>
+ throw new TableException("RowtimeAttribute has invalid type. Please report this bug.")
+ }
+ }
+
+ override def toNamedWindowProperty(name: String): NamedWindowProperty =
+ NamedWindowProperty(name, this)
+
+ override def toString: String = s"rowtime($child)"
+}
+
+case class ProctimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr) {
+
+ override private[flink] def validateInput(): ValidationResult = {
+ child match {
+ case WindowReference(_, Some(tpe: TypeInformation[_])) if isTimeIndicatorType(tpe) =>
+ ValidationSuccess
+ case WindowReference(_, _) =>
+ ValidationFailure("Reference to a rowtime or proctime window required.")
+ case any =>
+ ValidationFailure(
+ "The '.proctime' expression can only be used for table definitions and windows, " +
+ s"while [$any] was found.")
+ }
+ }
+
+ override def resultType: TypeInformation[_] =
+ TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+
+ override def toNamedWindowProperty(name: String): NamedWindowProperty =
+ NamedWindowProperty(name, this)
+
+ override def toString: String = s"proctime($child)"
+}
+
+/** Expression to access the timestamp of a StreamRecord. */
+case class StreamRecordTimestamp() extends LeafExpression {
+
+ override private[flink] def resultType = Types.LONG
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.getRexBuilder.makeCall(new StreamRecordTimestampSqlFunction)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/hashExpressions.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/hashExpressions.scala
new file mode 100644
index 0000000..b24d46c
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/hashExpressions.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+
+case class Md5(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+ override def toString: String = s"($child).md5()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.MD5, child.toRexNode)
+ }
+}
+
+case class Sha1(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+ override def toString: String = s"($child).sha1()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.SHA1, child.toRexNode)
+ }
+}
+
+case class Sha224(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+ override def toString: String = s"($child).sha224()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.SHA224, child.toRexNode)
+ }
+}
+
+case class Sha256(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+ override def toString: String = s"($child).sha256()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.SHA256, child.toRexNode)
+ }
+}
+
+case class Sha384(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+ override def toString: String = s"($child).sha384()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.SHA384, child.toRexNode)
+ }
+}
+
+case class Sha512(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = STRING_TYPE_INFO :: Nil
+
+ override def toString: String = s"($child).sha512()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.SHA512, child.toRexNode)
+ }
+}
+
+case class Sha2(child: PlannerExpression, hashLength: PlannerExpression)
+ extends BinaryExpression with InputTypeSpec {
+
+ override private[flink] def left = child
+ override private[flink] def right = hashLength
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ STRING_TYPE_INFO :: INT_TYPE_INFO :: Nil
+
+ override def toString: String = s"($child).sha2($hashLength)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.SHA2, left.toRexNode, right.toRexNode)
+ }
+
+}
+
+
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/literals.scala
new file mode 100644
index 0000000..b33ca66
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import java.sql.{Date, Time, Timestamp}
+import java.util.{Calendar, TimeZone}
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlIntervalQualifier
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.{DateString, TimeString, TimestampString}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+object Literal {
+ private[flink] val UTC = TimeZone.getTimeZone("UTC")
+
+ private[flink] def apply(l: Any): Literal = l match {
+ case i: Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
+ case s: Short => Literal(s, BasicTypeInfo.SHORT_TYPE_INFO)
+ case b: Byte => Literal(b, BasicTypeInfo.BYTE_TYPE_INFO)
+ case l: Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
+ case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
+ case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
+ case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
+ case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+ case javaDec: java.math.BigDecimal => Literal(javaDec, BasicTypeInfo.BIG_DEC_TYPE_INFO)
+ case scalaDec: scala.math.BigDecimal =>
+ Literal(scalaDec.bigDecimal, BasicTypeInfo.BIG_DEC_TYPE_INFO)
+ case sqlDate: Date => Literal(sqlDate, SqlTimeTypeInfo.DATE)
+ case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
+ case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
+ }
+}
+
+case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpression {
+ override def toString: String = resultType match {
+ case _: BasicTypeInfo[_] => value.toString
+ case _@SqlTimeTypeInfo.DATE => value.toString + ".toDate"
+ case _@SqlTimeTypeInfo.TIME => value.toString + ".toTime"
+ case _@SqlTimeTypeInfo.TIMESTAMP => value.toString + ".toTimestamp"
+ case _@TimeIntervalTypeInfo.INTERVAL_MILLIS => value.toString + ".millis"
+ case _@TimeIntervalTypeInfo.INTERVAL_MONTHS => value.toString + ".months"
+ case _ => s"Literal($value, $resultType)"
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ resultType match {
+ case BasicTypeInfo.BIG_DEC_TYPE_INFO =>
+ val bigDecValue = value.asInstanceOf[java.math.BigDecimal]
+ val decType = relBuilder.getTypeFactory.createSqlType(SqlTypeName.DECIMAL)
+ relBuilder.getRexBuilder.makeExactLiteral(bigDecValue, decType)
+
+ // create BIGINT literals for long type
+ case BasicTypeInfo.LONG_TYPE_INFO =>
+ val bigint = java.math.BigDecimal.valueOf(value.asInstanceOf[Long])
+ relBuilder.getRexBuilder.makeBigintLiteral(bigint)
+
+ // date/time
+ case SqlTimeTypeInfo.DATE =>
+ val datestr = DateString.fromCalendarFields(valueAsCalendar)
+ relBuilder.getRexBuilder.makeDateLiteral(datestr)
+ case SqlTimeTypeInfo.TIME =>
+ val timestr = TimeString.fromCalendarFields(valueAsCalendar)
+ relBuilder.getRexBuilder.makeTimeLiteral(timestr, 0)
+ case SqlTimeTypeInfo.TIMESTAMP =>
+ val timestampstr = TimestampString.fromCalendarFields(valueAsCalendar)
+ relBuilder.getRexBuilder.makeTimestampLiteral(timestampstr, 3)
+
+ case TimeIntervalTypeInfo.INTERVAL_MONTHS =>
+ val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Int])
+ val intervalQualifier = new SqlIntervalQualifier(
+ TimeUnit.YEAR,
+ TimeUnit.MONTH,
+ SqlParserPos.ZERO)
+ relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
+
+ case TimeIntervalTypeInfo.INTERVAL_MILLIS =>
+ val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Long])
+ val intervalQualifier = new SqlIntervalQualifier(
+ TimeUnit.DAY,
+ TimeUnit.SECOND,
+ SqlParserPos.ZERO)
+ relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
+
+ case _ => relBuilder.literal(value)
+ }
+ }
+
+ /**
+ * Convert a Date value to a Calendar. Calcite's fromCalendarField functions use the
+ * Calendar.get methods, so the raw values of the individual fields are preserved when
+ * converted to the String formats.
+ *
+ * @return get the Calendar value
+ */
+ private def valueAsCalendar: Calendar = {
+ val date = value.asInstanceOf[java.util.Date]
+ val cal = Calendar.getInstance
+ cal.setTime(date)
+ cal
+ }
+}
+
+@deprecated(
+ "Use nullOf(TypeInformation) instead. It is available through the implicit Scala DSL.",
+ "1.8.0")
+case class Null(resultType: TypeInformation[_]) extends LeafExpression {
+ override def toString = s"null"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val rexBuilder = relBuilder.getRexBuilder
+ val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ rexBuilder
+ .makeCast(
+ typeFactory.createFieldTypeFromLogicalType(
+ TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType(resultType)),
+ rexBuilder.constantNull())
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/logic.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/logic.scala
new file mode 100644
index 0000000..838261c
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/logic.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.validate._
+
+abstract class BinaryPredicate extends BinaryExpression {
+ override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
+ right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"$this only accepts children of Boolean type, " +
+ s"get $left : ${left.resultType} and $right : ${right.resultType}")
+ }
+ }
+}
+
+case class Not(child: PlannerExpression) extends UnaryExpression {
+
+ override def toString = s"!($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.not(child.toRexNode)
+ }
+
+ override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"Not operator requires a boolean expression as input, " +
+ s"but $child is of type ${child.resultType}")
+ }
+ }
+}
+
+case class And(left: PlannerExpression, right: PlannerExpression) extends BinaryPredicate {
+
+ override def toString = s"$left && $right"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.and(left.toRexNode, right.toRexNode)
+ }
+}
+
+case class Or(left: PlannerExpression, right: PlannerExpression) extends BinaryPredicate {
+
+ override def toString = s"$left || $right"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.or(left.toRexNode, right.toRexNode)
+ }
+}
+
+@deprecated(
+ "Use ifThenElse(...) instead. It is available through the implicit Scala DSL.",
+ "1.8.0")
+case class If(
+ condition: PlannerExpression,
+ ifTrue: PlannerExpression,
+ ifFalse: PlannerExpression)
+ extends PlannerExpression {
+ private[flink] def children = Seq(condition, ifTrue, ifFalse)
+
+ override private[flink] def resultType = ifTrue.resultType
+
+ override def toString = s"($condition)? $ifTrue : $ifFalse"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val c = condition.toRexNode
+ val t = ifTrue.toRexNode
+ val f = ifFalse.toRexNode
+ relBuilder.call(FlinkSqlOperatorTable.CASE, c, t, f)
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (condition.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
+ ifTrue.resultType == ifFalse.resultType) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(
+ s"If should have boolean condition and same type of ifTrue and ifFalse, get " +
+ s"(${condition.resultType}, ${ifTrue.resultType}, ${ifFalse.resultType})")
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
new file mode 100644
index 0000000..d52adfb
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+case class Abs(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Abs")
+
+ override def toString: String = s"abs($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.ABS, child.toRexNode)
+ }
+}
+
+case class Ceil(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Ceil")
+
+ override def toString: String = s"ceil($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.CEIL, child.toRexNode)
+ }
+}
+
+case class Exp(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+ override def toString: String = s"exp($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.EXP, child.toRexNode)
+ }
+}
+
+
+case class Floor(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Floor")
+
+ override def toString: String = s"floor($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.FLOOR, child.toRexNode)
+ }
+}
+
+case class Log10(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+ override def toString: String = s"log10($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.LOG10, child.toRexNode)
+ }
+}
+
+case class Log2(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = {
+ relBuilder.call(FlinkSqlOperatorTable.LOG2, child.toRexNode)
+ }
+
+ override def toString: String = s"log2($child)"
+}
+
+case class Cosh(child: PlannerExpression) extends UnaryExpression {
+
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = {
+ relBuilder.call(FlinkSqlOperatorTable.COSH, child.toRexNode)
+ }
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Cosh")
+
+ override def toString = s"cosh($child)"
+}
+
+case class Log(base: PlannerExpression, antilogarithm: PlannerExpression)
+ extends PlannerExpression with InputTypeSpec {
+ def this(antilogarithm: PlannerExpression) = this(E(), antilogarithm)
+
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def children: Seq[PlannerExpression] =
+ if (base == null) Seq(antilogarithm) else Seq(base, antilogarithm)
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq.fill(children.length)(DOUBLE_TYPE_INFO)
+
+ override def toString: String = s"log(${children.mkString(",")})"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.LOG, children.map(_.toRexNode))
+ }
+}
+
+object Log {
+ def apply(antilogarithm: PlannerExpression): Log = Log(null, antilogarithm)
+}
+
+case class Ln(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+ override def toString: String = s"ln($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.LN, child.toRexNode)
+ }
+}
+
+case class Power(left: PlannerExpression, right: PlannerExpression)
+ extends BinaryExpression with InputTypeSpec {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil
+
+ override def toString: String = s"pow($left, $right)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.POWER, left.toRexNode, right.toRexNode)
+ }
+}
+
+case class Sinh(child: PlannerExpression) extends UnaryExpression {
+
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO;
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Sinh")
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = {
+ relBuilder.call(FlinkSqlOperatorTable.SINH, child.toRexNode)
+ }
+
+ override def toString = s"sinh($child)"
+}
+
+case class Sqrt(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(DOUBLE_TYPE_INFO)
+
+ override def toString: String = s"sqrt($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.POWER, child.toRexNode, Literal(0.5).toRexNode)
+ }
+}
+
+case class Sin(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Sin")
+
+ override def toString: String = s"sin($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.SIN, child.toRexNode)
+ }
+}
+
+case class Cos(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Cos")
+
+ override def toString: String = s"cos($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.COS, child.toRexNode)
+ }
+}
+
+case class Tan(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Tan")
+
+ override def toString: String = s"tan($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.TAN, child.toRexNode)
+ }
+}
+
+case class Tanh(child: PlannerExpression) extends UnaryExpression {
+
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder) = {
+ relBuilder.call(FlinkSqlOperatorTable.TANH, child.toRexNode)
+ }
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Tanh")
+
+ override def toString = s"tanh($child)"
+}
+
+case class Cot(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Cot")
+
+ override def toString: String = s"cot($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.COT, child.toRexNode)
+ }
+}
+
+case class Asin(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Asin")
+
+ override def toString: String = s"asin($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.ASIN, child.toRexNode)
+ }
+}
+
+case class Acos(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Acos")
+
+ override def toString: String = s"acos($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.ACOS, child.toRexNode)
+ }
+}
+
+case class Atan(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Atan")
+
+ override def toString: String = s"atan($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.ATAN, child.toRexNode)
+ }
+}
+
+case class Atan2(y: PlannerExpression, x: PlannerExpression) extends BinaryExpression {
+
+ override private[flink] def left = y
+
+ override private[flink] def right = x
+
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def validateInput() = {
+ TypeInfoCheckUtils.assertNumericExpr(y.resultType, "atan2")
+ TypeInfoCheckUtils.assertNumericExpr(x.resultType, "atan2")
+ }
+
+ override def toString: String = s"atan2($left, $right)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.ATAN2, left.toRexNode, right.toRexNode)
+ }
+}
+
+case class Degrees(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Degrees")
+
+ override def toString: String = s"degrees($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.DEGREES, child.toRexNode)
+ }
+}
+
+case class Radians(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "Radians")
+
+ override def toString: String = s"radians($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.RADIANS, child.toRexNode)
+ }
+}
+
+case class Sign(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertNumericExpr(child.resultType, "sign")
+
+ override def toString: String = s"sign($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.SIGN, child.toRexNode)
+ }
+}
+
+case class Round(left: PlannerExpression, right: PlannerExpression)
+ extends BinaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = left.resultType
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (!TypeInfoCheckUtils.isInteger(right.resultType)) {
+ ValidationFailure(s"round right requires int, get " +
+ s"$right : ${right.resultType}")
+ }
+ TypeInfoCheckUtils.assertNumericExpr(left.resultType, s"round left :$left")
+ }
+
+ override def toString: String = s"round($left, $right)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.ROUND, left.toRexNode, right.toRexNode)
+ }
+}
+
+case class Pi() extends LeafExpression {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override def toString: String = s"pi()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.PI)
+ }
+}
+
+case class E() extends LeafExpression {
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override def toString: String = s"e()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.E)
+ }
+}
+
+case class Rand(seed: PlannerExpression) extends PlannerExpression with InputTypeSpec {
+
+ def this() = this(null)
+
+ override private[flink] def children: Seq[PlannerExpression] = if (seed != null) {
+ seed :: Nil
+ } else {
+ Nil
+ }
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.DOUBLE_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
+ INT_TYPE_INFO :: Nil
+ } else {
+ Nil
+ }
+
+ override def toString: String = if (seed != null) {
+ s"rand($seed)"
+ } else {
+ s"rand()"
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.RAND, children.map(_.toRexNode))
+ }
+}
+
+case class RandInteger(seed: PlannerExpression, bound: PlannerExpression)
+ extends PlannerExpression with InputTypeSpec {
+
+ def this(bound: PlannerExpression) = this(null, bound)
+
+ override private[flink] def children: Seq[PlannerExpression] = if (seed != null) {
+ seed :: bound :: Nil
+ } else {
+ bound :: Nil
+ }
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.INT_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
+ INT_TYPE_INFO :: INT_TYPE_INFO :: Nil
+ } else {
+ INT_TYPE_INFO :: Nil
+ }
+
+ override def toString: String = if (seed != null) {
+ s"randInteger($seed, $bound)"
+ } else {
+ s"randInteger($bound)"
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.RAND_INTEGER, children.map(_.toRexNode))
+ }
+}
+
+case class Bin(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ TypeInfoCheckUtils.assertIntegerFamilyExpr(child.resultType, "Bin")
+
+ override def toString: String = s"bin($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.BIN, child.toRexNode)
+ }
+}
+
+case class Hex(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (TypeInfoCheckUtils.isIntegerFamily(child.resultType) ||
+ TypeInfoCheckUtils.isString(child.resultType)) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"hex() requires an integer or string input but was '${child.resultType}'.")
+ }
+ }
+
+ override def toString: String = s"hex($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.HEX, child.toRexNode)
+ }
+}
+
+case class UUID() extends LeafExpression {
+ override private[flink] def resultType = BasicTypeInfo.STRING_TYPE_INFO
+
+ override def toString: String = s"uuid()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.UUID)
+ }
+}
+
+case class Truncate(base: PlannerExpression, num: PlannerExpression)
+ extends PlannerExpression with InputTypeSpec {
+ def this(base: PlannerExpression) = this(base, null)
+
+ override private[flink] def resultType: TypeInformation[_] = base.resultType
+
+ override private[flink] def children: Seq[PlannerExpression] =
+ if (num == null) Seq(base) else Seq(base, num)
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ if (num == null) Seq(base.resultType) else Seq(base.resultType, INT_TYPE_INFO)
+
+ override def toString: String = s"truncate(${children.mkString(",")})"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.TRUNCATE, children.map(_.toRexNode))
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (num != null) {
+ if (!TypeInfoCheckUtils.isInteger(num.resultType)) {
+ ValidationFailure(s"truncate num requires int, get " +
+ s"$num : ${num.resultType}")
+ }
+ }
+ TypeInfoCheckUtils.assertNumericExpr(base.resultType, s"truncate base :$base")
+ }
+}
+
+object Truncate {
+ def apply(base: PlannerExpression): Truncate = Truncate(base, null)
+}
+
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ordering.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ordering.scala
new file mode 100644
index 0000000..2b668d9
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ordering.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.validate._
+
+abstract class Ordering extends UnaryExpression {
+ override private[flink] def validateInput(): ValidationResult = {
+ if (!child.isInstanceOf[NamedExpression]) {
+ ValidationFailure(s"Sort should only based on field reference")
+ } else {
+ ValidationSuccess
+ }
+ }
+}
+
+case class Asc(child: PlannerExpression) extends Ordering {
+ override def toString: String = s"($child).asc"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ child.toRexNode
+ }
+
+ override private[flink] def resultType: TypeInformation[_] = child.resultType
+}
+
+case class Desc(child: PlannerExpression) extends Ordering {
+ override def toString: String = s"($child).desc"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.desc(child.toRexNode)
+ }
+
+ override private[flink] def resultType: TypeInformation[_] = child.resultType
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala
new file mode 100644
index 0000000..ce1966f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+case class CurrentRow() extends PlannerExpression {
+ override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
+
+ override private[flink] def children = Seq()
+
+ override def toString = "CURRENT ROW"
+}
+
+case class CurrentRange() extends PlannerExpression {
+ override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+ override private[flink] def children = Seq()
+
+ override def toString = "CURRENT RANGE"
+}
+
+case class UnboundedRow() extends PlannerExpression {
+ override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
+
+ override private[flink] def children = Seq()
+
+ override def toString = "UNBOUNDED ROW"
+}
+
+case class UnboundedRange() extends PlannerExpression {
+ override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+ override private[flink] def children = Seq()
+
+ override def toString = "UNBOUNDED RANGE"
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/package.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/package.scala
new file mode 100644
index 0000000..41e0c9f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/package.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table
+
+/**
+ * This package contains the base class of AST nodes and all the expression language AST classes.
+ * Expression trees should not be manually constructed by users. They are implicitly constructed
+ * from the implicit DSL conversions in
+ * [[org.apache.flink.table.api.scala.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.table.api.scala.ImplicitExpressionOperations]]. For the Java API,
+ * expression trees should be generated from a string parser that parses expressions and creates
+ * AST nodes.
+ */
+package object expressions
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
new file mode 100644
index 0000000..9ef0252
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.expressions.PlannerTrimMode.PlannerTrimMode
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+/**
+ * Returns the length of this `str`.
+ */
+case class CharLength(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (child.resultType == STRING_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"CharLength operator requires String input, " +
+ s"but $child is of type ${child.resultType}")
+ }
+ }
+
+ override def toString: String = s"($child).charLength()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.CHAR_LENGTH, child.toRexNode)
+ }
+}
+
+/**
+ * Returns str with the first letter of each word in uppercase.
+ * All other letters are in lowercase. Words are delimited by white space.
+ */
+case class InitCap(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (child.resultType == STRING_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"InitCap operator requires String input, " +
+ s"but $child is of type ${child.resultType}")
+ }
+ }
+
+ override def toString: String = s"($child).initCap()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.INITCAP, child.toRexNode)
+ }
+}
+
+/**
+ * Returns true if `str` matches `pattern`.
+ */
+case class Like(str: PlannerExpression, pattern: PlannerExpression) extends BinaryExpression {
+ private[flink] def left: PlannerExpression = str
+ private[flink] def right: PlannerExpression = pattern
+
+ override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"Like operator requires (String, String) input, " +
+ s"but ($str, $pattern) is of type (${str.resultType}, ${pattern.resultType})")
+ }
+ }
+
+ override def toString: String = s"($str).like($pattern)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.LIKE, children.map(_.toRexNode))
+ }
+}
+
+/**
+ * Returns str with all characters changed to lowercase.
+ */
+case class Lower(child: PlannerExpression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (child.resultType == STRING_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"Lower operator requires String input, " +
+ s"but $child is of type ${child.resultType}")
+ }
+ }
+
+ override def toString: String = s"($child).lowerCase()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.LOWER, child.toRexNode)
+ }
+}
+
+/**
+ * Returns true if `str` is similar to `pattern`.
+ */
+case class Similar(str: PlannerExpression, pattern: PlannerExpression) extends BinaryExpression {
+ private[flink] def left: PlannerExpression = str
+ private[flink] def right: PlannerExpression = pattern
+
+ override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"Similar operator requires (String, String) input, " +
+ s"but ($str, $pattern) is of type (${str.resultType}, ${pattern.resultType})")
+ }
+ }
+
+ override def toString: String = s"($str).similarTo($pattern)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.SIMILAR_TO, children.map(_.toRexNode))
+ }
+}
+
+/**
+ * Returns substring of `str` from `begin`(inclusive) for `length`.
+ */
+case class Substring(
+ str: PlannerExpression,
+ begin: PlannerExpression,
+ length: PlannerExpression) extends PlannerExpression with InputTypeSpec {
+
+ def this(str: PlannerExpression, begin: PlannerExpression) = this(str, begin, CharLength(str))
+
+ override private[flink] def children: Seq[PlannerExpression] = str :: begin :: length :: Nil
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+ override def toString: String = s"($str).substring($begin, $length)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.SUBSTRING, children.map(_.toRexNode))
+ }
+}
+
+/**
+ * Trim `trimString` from `str` according to `trimMode`.
+ */
+case class Trim(
+ trimMode: PlannerExpression,
+ trimString: PlannerExpression,
+ str: PlannerExpression) extends PlannerExpression {
+
+ override private[flink] def children: Seq[PlannerExpression] =
+ trimMode :: trimString :: str :: Nil
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ trimMode match {
+ case SymbolPlannerExpression(_: PlannerTrimMode) =>
+ if (trimString.resultType != STRING_TYPE_INFO) {
+ ValidationFailure(s"String expected for trimString, get ${trimString.resultType}")
+ } else if (str.resultType != STRING_TYPE_INFO) {
+ ValidationFailure(s"String expected for str, get ${str.resultType}")
+ } else {
+ ValidationSuccess
+ }
+ case _ => ValidationFailure("TrimMode symbol expected.")
+ }
+ }
+
+ override def toString: String = s"($str).trim($trimMode, $trimString)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.TRIM, children.map(_.toRexNode))
+ }
+}
+
+/**
+ * Enumeration of trim flags.
+ */
+object TrimConstants {
+ val TRIM_DEFAULT_CHAR = Literal(" ")
+}
+
+/**
+ * Returns str with all characters changed to uppercase.
+ */
+case class Upper(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(STRING_TYPE_INFO)
+
+ override def toString: String = s"($child).upperCase()"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.UPPER, child.toRexNode)
+ }
+}
+
+/**
+ * Returns the position of string needle in string haystack.
+ */
+case class Position(needle: PlannerExpression, haystack: PlannerExpression)
+ extends PlannerExpression with InputTypeSpec {
+
+ override private[flink] def children: Seq[PlannerExpression] = Seq(needle, haystack)
+
+ override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(STRING_TYPE_INFO, STRING_TYPE_INFO)
+
+ override def toString: String = s"($needle).position($haystack)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.POSITION, needle.toRexNode, haystack.toRexNode)
+ }
+}
+
+/**
+ * Replaces a substring of a string with a replacement string.
+ * Starting at a position for a given length.
+ */
+case class Overlay(
+ str: PlannerExpression,
+ replacement: PlannerExpression,
+ starting: PlannerExpression,
+ position: PlannerExpression)
+ extends PlannerExpression with InputTypeSpec {
+
+ def this(str: PlannerExpression, replacement: PlannerExpression, starting: PlannerExpression) =
+ this(str, replacement, starting, CharLength(replacement))
+
+ override private[flink] def children: Seq[PlannerExpression] =
+ Seq(str, replacement, starting, position)
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+ override def toString: String = s"($str).overlay($replacement, $starting, $position)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(
+ FlinkSqlOperatorTable.OVERLAY,
+ str.toRexNode,
+ replacement.toRexNode,
+ starting.toRexNode,
+ position.toRexNode)
+ }
+}
+
+/**
+ * Returns the string that results from concatenating the arguments.
+ * Returns NULL if any argument is NULL.
+ */
+case class Concat(strings: Seq[PlannerExpression]) extends PlannerExpression with InputTypeSpec {
+
+ override private[flink] def children: Seq[PlannerExpression] = strings
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ children.map(_ => STRING_TYPE_INFO)
+
+ override def toString: String = s"concat($strings)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.CONCAT, children.map(_.toRexNode))
+ }
+}
+
+/**
+ * Returns the string that results from concatenating the arguments and separator.
+ * Returns NULL If the separator is NULL.
+ *
+ * Note: this user-defined function does not skip empty strings. However, it does skip any NULL
+ * values after the separator argument.
+ **/
+case class ConcatWs(separator: PlannerExpression, strings: Seq[PlannerExpression])
+ extends PlannerExpression with InputTypeSpec {
+
+ override private[flink] def children: Seq[PlannerExpression] = Seq(separator) ++ strings
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ children.map(_ => STRING_TYPE_INFO)
+
+ override def toString: String = s"concat_ws($separator, $strings)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.CONCAT_WS, children.map(_.toRexNode))
+ }
+}
+
+case class Lpad(text: PlannerExpression, len: PlannerExpression, pad: PlannerExpression)
+ extends PlannerExpression with InputTypeSpec {
+
+ override private[flink] def children: Seq[PlannerExpression] = Seq(text, len, pad)
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+
+ override def toString: String = s"($text).lpad($len, $pad)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.LPAD, children.map(_.toRexNode))
+ }
+}
+
+case class Rpad(text: PlannerExpression, len: PlannerExpression, pad: PlannerExpression)
+ extends PlannerExpression with InputTypeSpec {
+
+ override private[flink] def children: Seq[PlannerExpression] = Seq(text, len, pad)
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+
+ override def toString: String = s"($text).rpad($len, $pad)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.RPAD, children.map(_.toRexNode))
+ }
+}
+
+/**
+ * Returns a string with all substrings that match the regular expression consecutively
+ * being replaced.
+ */
+case class RegexpReplace(
+ str: PlannerExpression,
+ regex: PlannerExpression,
+ replacement: PlannerExpression)
+ extends PlannerExpression with InputTypeSpec {
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO)
+
+ override private[flink] def children: Seq[PlannerExpression] = Seq(str, regex, replacement)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.REGEXP_REPLACE, children.map(_.toRexNode))
+ }
+
+ override def toString: String = s"($str).regexp_replace($regex, $replacement)"
+}
+
+/**
+ * Returns a string extracted with a specified regular expression and a regex match group index.
+ */
+case class RegexpExtract(
+ str: PlannerExpression,
+ regex: PlannerExpression,
+ extractIndex: PlannerExpression)
+ extends PlannerExpression with InputTypeSpec {
+ def this(str: PlannerExpression, regex: PlannerExpression) = this(str, regex, null)
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = {
+ if (extractIndex == null) {
+ Seq(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+ } else {
+ Seq(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO)
+ }
+ }
+
+ override private[flink] def children: Seq[PlannerExpression] = {
+ if (extractIndex == null) {
+ Seq(str, regex)
+ } else {
+ Seq(str, regex, extractIndex)
+ }
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.REGEXP_EXTRACT, children.map(_.toRexNode))
+ }
+
+ override def toString: String = s"($str).regexp_extract($regex, $extractIndex)"
+}
+
+object RegexpExtract {
+ def apply(str: PlannerExpression, regex: PlannerExpression): RegexpExtract =
+ RegexpExtract(str, regex, null)
+}
+
+/**
+ * Returns the base string decoded with base64.
+ * Returns NULL If the input string is NULL.
+ */
+case class FromBase64(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (child.resultType == STRING_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"FromBase64 operator requires String input, " +
+ s"but $child is of type ${child.resultType}")
+ }
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.FROM_BASE64, child.toRexNode)
+ }
+
+ override def toString: String = s"($child).fromBase64"
+
+}
+
+/**
+ * Returns the base64-encoded result of the input string.
+ */
+case class ToBase64(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (child.resultType == STRING_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"ToBase64 operator requires a String input, " +
+ s"but $child is of type ${child.resultType}")
+ }
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.TO_BASE64, child.toRexNode)
+ }
+
+ override def toString: String = s"($child).toBase64"
+
+}
+
+/**
+ * Returns a string that removes the left whitespaces from the given string.
+ */
+case class LTrim(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (child.resultType == STRING_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"LTrim operator requires a String input, " +
+ s"but $child is of type ${child.resultType}")
+ }
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.LTRIM, child.toRexNode)
+ }
+
+ override def toString = s"($child).ltrim"
+}
+
+/**
+ * Returns a string that removes the right whitespaces from the given string.
+ */
+case class RTrim(child: PlannerExpression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (child.resultType == STRING_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"RTrim operator requires a String input, " +
+ s"but $child is of type ${child.resultType}")
+ }
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.RTRIM, child.toRexNode)
+ }
+
+ override def toString = s"($child).rtrim"
+}
+
+/**
+ * Returns a string that repeats the base str n times.
+ */
+case class Repeat(str: PlannerExpression, n: PlannerExpression)
+ extends PlannerExpression with InputTypeSpec {
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(STRING_TYPE_INFO, INT_TYPE_INFO)
+
+ override private[flink] def children: Seq[PlannerExpression] = Seq(str, n)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.REPEAT, str.toRexNode, n.toRexNode)
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (str.resultType == STRING_TYPE_INFO && n.resultType == INT_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"Repeat operator requires (String, Int) input, " +
+ s"but ($str, $n) is of type (${str.resultType}, ${n.resultType})")
+ }
+ }
+
+ override def toString: String = s"($str).repeat($n)"
+}
+
+/**
+ * Returns a new string which replaces all the occurrences of the search target
+ * with the replacement string (non-overlapping).
+ */
+case class Replace(
+ str: PlannerExpression,
+ search: PlannerExpression,
+ replacement: PlannerExpression) extends PlannerExpression with InputTypeSpec {
+
+ def this(str: PlannerExpression, begin: PlannerExpression) = this(str, begin, CharLength(str))
+
+ override private[flink] def children: Seq[PlannerExpression] = str :: search :: replacement :: Nil
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO)
+
+ override def toString: String = s"($str).replace($search, $replacement)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.REPLACE, children.map(_.toRexNode))
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/subquery.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/subquery.scala
new file mode 100644
index 0000000..694e0ea
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/subquery.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.rex.{RexNode, RexSubQuery}
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.operations.QueryOperation
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils._
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+case class In(expression: PlannerExpression, elements: Seq[PlannerExpression])
+ extends PlannerExpression {
+
+ override def toString = s"$expression.in(${elements.mkString(", ")})"
+
+ override private[flink] def children: Seq[PlannerExpression] = expression +: elements.distinct
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ // check if this is a sub-query expression or an element list
+ elements.head match {
+
+ case TableReference(_, tableOperation: QueryOperation) =>
+ RexSubQuery.in(
+ relBuilder.asInstanceOf[FlinkRelBuilder].queryOperation(tableOperation).build(),
+ ImmutableList.of(expression.toRexNode))
+
+ case _ =>
+ relBuilder.call(FlinkSqlOperatorTable.IN, children.map(_.toRexNode): _*)
+ }
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ // check if this is a sub-query expression or an element list
+ elements.head match {
+
+ case TableReference(name, tableOperation: QueryOperation) =>
+ if (elements.length != 1) {
+ return ValidationFailure("IN operator supports only one table reference.")
+ }
+ val tableSchema = tableOperation.getTableSchema
+ if (tableSchema.getFieldCount > 1) {
+ return ValidationFailure(
+ s"The sub-query table '$name' must not have more than one column.")
+ }
+ (expression.resultType, tableSchema.getFieldType(0).get()) match {
+ case (lType, rType) if lType == rType => ValidationSuccess
+ case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+ case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
+ ValidationSuccess
+ case (lType, rType) =>
+ ValidationFailure(s"IN operator on incompatible types: $lType and $rType.")
+ }
+
+ case _ =>
+ val types = children.tail.map(_.resultType)
+ if (types.distinct.length != 1) {
+ return ValidationFailure(
+ s"Types on the right side of the IN operator must be the same, " +
+ s"got ${types.mkString(", ")}.")
+ }
+ (children.head.resultType, children.last.resultType) match {
+ case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+ case (lType, rType) if lType == rType => ValidationSuccess
+ case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
+ ValidationSuccess
+ case (lType, rType) =>
+ ValidationFailure(s"IN operator on incompatible types: $lType and $rType.")
+ }
+ }
+ }
+
+ override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+}
+
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/symbols.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/symbols.scala
new file mode 100644
index 0000000..48135a3
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/symbols.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlTrimFunction
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.language.{existentials, implicitConversions}
+
+/**
+ * General expression class to represent a symbol.
+ */
+case class SymbolPlannerExpression(symbol: PlannerSymbol) extends LeafExpression {
+
+ override private[flink] def resultType: TypeInformation[_] =
+ throw new UnsupportedOperationException("This should not happen. A symbol has no result type.")
+
+ def toExpr: SymbolPlannerExpression = this // triggers implicit conversion
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ // dirty hack to pass Java enums to Java from Scala
+ val enum = symbol.enum.asInstanceOf[Enum[T] forSome { type T <: Enum[T] }]
+ relBuilder.getRexBuilder.makeFlag(enum)
+ }
+
+ override def toString: String = s"${symbol.symbols}.${symbol.name}"
+
+}
+
+/**
+ * Symbol that wraps a Calcite symbol in form of a Java enum.
+ */
+trait PlannerSymbol {
+ def symbols: PlannerSymbols
+ def name: String
+ def enum: Enum[_]
+}
+
+/**
+ * Enumeration of symbols.
+ */
+abstract class PlannerSymbols extends Enumeration {
+
+ class PlannerSymbolValue(e: Enum[_]) extends Val(e.name()) with PlannerSymbol {
+ override def symbols: PlannerSymbols = PlannerSymbols.this
+
+ override def enum: Enum[_] = e
+
+ override def name: String = toString()
+ }
+
+ protected final def Value(enum: Enum[_]): PlannerSymbolValue = new PlannerSymbolValue(enum)
+
+ implicit def symbolToExpression(symbol: PlannerSymbolValue): SymbolPlannerExpression =
+ SymbolPlannerExpression(symbol)
+
+}
+
+/**
+ * Units for working with time intervals.
+ */
+object PlannerTimeIntervalUnit extends PlannerSymbols {
+
+ type PlannerTimeIntervalUnit = PlannerSymbolValue
+
+ val YEAR = Value(TimeUnitRange.YEAR)
+ val YEAR_TO_MONTH = Value(TimeUnitRange.YEAR_TO_MONTH)
+ val QUARTER = Value(TimeUnitRange.QUARTER)
+ val MONTH = Value(TimeUnitRange.MONTH)
+ val WEEK = Value(TimeUnitRange.WEEK)
+ val DAY = Value(TimeUnitRange.DAY)
+ val DAY_TO_HOUR = Value(TimeUnitRange.DAY_TO_HOUR)
+ val DAY_TO_MINUTE = Value(TimeUnitRange.DAY_TO_MINUTE)
+ val DAY_TO_SECOND = Value(TimeUnitRange.DAY_TO_SECOND)
+ val HOUR = Value(TimeUnitRange.HOUR)
+ val HOUR_TO_MINUTE = Value(TimeUnitRange.HOUR_TO_MINUTE)
+ val HOUR_TO_SECOND = Value(TimeUnitRange.HOUR_TO_SECOND)
+ val MINUTE = Value(TimeUnitRange.MINUTE)
+ val MINUTE_TO_SECOND = Value(TimeUnitRange.MINUTE_TO_SECOND)
+ val SECOND = Value(TimeUnitRange.SECOND)
+
+}
+
+/**
+ * Units for working with time points.
+ */
+object PlannerTimePointUnit extends PlannerSymbols {
+
+ type PlannerTimePointUnit = PlannerSymbolValue
+
+ val YEAR = Value(TimeUnit.YEAR)
+ val MONTH = Value(TimeUnit.MONTH)
+ val DAY = Value(TimeUnit.DAY)
+ val HOUR = Value(TimeUnit.HOUR)
+ val MINUTE = Value(TimeUnit.MINUTE)
+ val SECOND = Value(TimeUnit.SECOND)
+ val QUARTER = Value(TimeUnit.QUARTER)
+ val WEEK = Value(TimeUnit.WEEK)
+ val MILLISECOND = Value(TimeUnit.MILLISECOND)
+ val MICROSECOND = Value(TimeUnit.MICROSECOND)
+
+}
+
+/**
+ * Modes for trimming strings.
+ */
+object PlannerTrimMode extends PlannerSymbols {
+
+ type PlannerTrimMode = PlannerSymbolValue
+
+ val BOTH = Value(SqlTrimFunction.Flag.BOTH)
+ val LEADING = Value(SqlTrimFunction.Flag.LEADING)
+ val TRAILING = Value(SqlTrimFunction.Flag.TRAILING)
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/time.scala
new file mode 100644
index 0000000..dee618b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex._
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.expressions.PlannerTimeIntervalUnit.PlannerTimeIntervalUnit
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeInfoCheckUtils}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConversions._
+
+case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpression)
+ extends PlannerExpression {
+
+ override private[flink] def children: Seq[PlannerExpression] = timeIntervalUnit :: temporal :: Nil
+
+ override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (!TypeInfoCheckUtils.isTemporal(temporal.resultType)) {
+ return ValidationFailure(s"Extract operator requires Temporal input, " +
+ s"but $temporal is of type ${temporal.resultType}")
+ }
+
+ timeIntervalUnit match {
+ case SymbolPlannerExpression(PlannerTimeIntervalUnit.YEAR)
+ | SymbolPlannerExpression(PlannerTimeIntervalUnit.QUARTER)
+ | SymbolPlannerExpression(PlannerTimeIntervalUnit.MONTH)
+ | SymbolPlannerExpression(PlannerTimeIntervalUnit.WEEK)
+ | SymbolPlannerExpression(PlannerTimeIntervalUnit.DAY)
+ if temporal.resultType == SqlTimeTypeInfo.DATE
+ || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
+ || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS
+ || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MONTHS =>
+ ValidationSuccess
+
+ case SymbolPlannerExpression(PlannerTimeIntervalUnit.HOUR)
+ | SymbolPlannerExpression(PlannerTimeIntervalUnit.MINUTE)
+ | SymbolPlannerExpression(PlannerTimeIntervalUnit.SECOND)
+ if temporal.resultType == SqlTimeTypeInfo.TIME
+ || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
+ || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS =>
+ ValidationSuccess
+
+ case _ =>
+ ValidationFailure(s"Extract operator does not support unit '$timeIntervalUnit' for input" +
+ s" of type '${temporal.resultType}'.")
+ }
+ }
+
+ override def toString: String = s"($temporal).extract($timeIntervalUnit)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(
+ FlinkSqlOperatorTable.EXTRACT,
+ Seq(timeIntervalUnit.toRexNode, temporal.toRexNode))
+ }
+}
+
+abstract class TemporalCeilFloor(
+ timeIntervalUnit: PlannerExpression,
+ temporal: PlannerExpression)
+ extends PlannerExpression {
+
+ override private[flink] def children: Seq[PlannerExpression] = timeIntervalUnit :: temporal :: Nil
+
+ override private[flink] def resultType: TypeInformation[_] = temporal.resultType
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (!TypeInfoCheckUtils.isTimePoint(temporal.resultType)) {
+ return ValidationFailure(s"Temporal ceil/floor operator requires Time Point input, " +
+ s"but $temporal is of type ${temporal.resultType}")
+ }
+ val unit = timeIntervalUnit match {
+ case SymbolPlannerExpression(u: PlannerTimeIntervalUnit) => Some(u)
+ case _ => None
+ }
+ if (unit.isEmpty) {
+ return ValidationFailure(s"Temporal ceil/floor operator requires Time Interval Unit " +
+ s"input, but $timeIntervalUnit is of type ${timeIntervalUnit.resultType}")
+ }
+
+ (unit.get, temporal.resultType) match {
+ case (PlannerTimeIntervalUnit.YEAR | PlannerTimeIntervalUnit.MONTH,
+ SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP) =>
+ ValidationSuccess
+ case (PlannerTimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP) =>
+ ValidationSuccess
+ case (PlannerTimeIntervalUnit.HOUR | PlannerTimeIntervalUnit.MINUTE |
+ PlannerTimeIntervalUnit.SECOND, SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP) =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure(s"Temporal ceil/floor operator does not support " +
+ s"unit '$timeIntervalUnit' for input of type '${temporal.resultType}'.")
+ }
+ }
+}
+
+case class TemporalFloor(
+ timeIntervalUnit: PlannerExpression,
+ temporal: PlannerExpression)
+ extends TemporalCeilFloor(
+ timeIntervalUnit,
+ temporal) {
+
+ override def toString: String = s"($temporal).floor($timeIntervalUnit)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.FLOOR, temporal.toRexNode, timeIntervalUnit.toRexNode)
+ }
+}
+
+case class TemporalCeil(
+ timeIntervalUnit: PlannerExpression,
+ temporal: PlannerExpression)
+ extends TemporalCeilFloor(
+ timeIntervalUnit,
+ temporal) {
+
+ override def toString: String = s"($temporal).ceil($timeIntervalUnit)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(FlinkSqlOperatorTable.CEIL, temporal.toRexNode, timeIntervalUnit.toRexNode)
+ }
+}
+
+abstract class CurrentTimePoint(
+ targetType: TypeInformation[_],
+ local: Boolean)
+ extends LeafExpression {
+
+ override private[flink] def resultType: TypeInformation[_] = targetType
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (!TypeInfoCheckUtils.isTimePoint(targetType)) {
+ ValidationFailure(s"CurrentTimePoint operator requires Time Point target type, " +
+ s"but get $targetType.")
+ } else if (local && targetType == SqlTimeTypeInfo.DATE) {
+ ValidationFailure(s"Localized CurrentTimePoint operator requires Time or Timestamp target " +
+ s"type, but get $targetType.")
+ } else {
+ ValidationSuccess
+ }
+ }
+
+ override def toString: String = if (local) {
+ s"local$targetType()"
+ } else {
+ s"current$targetType()"
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val operator = targetType match {
+ case SqlTimeTypeInfo.TIME if local => FlinkSqlOperatorTable.LOCALTIME
+ case SqlTimeTypeInfo.TIMESTAMP if local => FlinkSqlOperatorTable.LOCALTIMESTAMP
+ case SqlTimeTypeInfo.DATE => FlinkSqlOperatorTable.CURRENT_DATE
+ case SqlTimeTypeInfo.TIME => FlinkSqlOperatorTable.CURRENT_TIME
+ case SqlTimeTypeInfo.TIMESTAMP => FlinkSqlOperatorTable.CURRENT_TIMESTAMP
+ }
+ relBuilder.call(operator)
+ }
+}
+
+case class CurrentDate() extends CurrentTimePoint(SqlTimeTypeInfo.DATE, local = false)
+
+case class CurrentTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = false)
+
+case class CurrentTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = false)
+
+case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = true)
+
+case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true)
+
+/**
+ * Determines whether two anchored time intervals overlap.
+ */
+case class TemporalOverlaps(
+ leftTimePoint: PlannerExpression,
+ leftTemporal: PlannerExpression,
+ rightTimePoint: PlannerExpression,
+ rightTemporal: PlannerExpression)
+ extends PlannerExpression {
+
+ override private[flink] def children: Seq[PlannerExpression] =
+ Seq(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
+
+ override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (!TypeInfoCheckUtils.isTimePoint(leftTimePoint.resultType)) {
+ return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint to be of type " +
+ s"Time Point, but get ${leftTimePoint.resultType}.")
+ }
+ if (!TypeInfoCheckUtils.isTimePoint(rightTimePoint.resultType)) {
+ return ValidationFailure(s"TemporalOverlaps operator requires rightTimePoint to be of " +
+ s"type Time Point, but get ${rightTimePoint.resultType}.")
+ }
+ if (leftTimePoint.resultType != rightTimePoint.resultType) {
+ return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint and " +
+ s"rightTimePoint to be of same type.")
+ }
+
+ // leftTemporal is point, then it must be comparable with leftTimePoint
+ if (TypeInfoCheckUtils.isTimePoint(leftTemporal.resultType)) {
+ if (leftTemporal.resultType != leftTimePoint.resultType) {
+ return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal and " +
+ s"leftTimePoint to be of same type if leftTemporal is of type Time Point.")
+ }
+ } else if (!isTimeInterval(leftTemporal.resultType)) {
+ return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal to be of " +
+ s"type Time Point or Time Interval.")
+ }
+
+ // rightTemporal is point, then it must be comparable with rightTimePoint
+ if (TypeInfoCheckUtils.isTimePoint(rightTemporal.resultType)) {
+ if (rightTemporal.resultType != rightTimePoint.resultType) {
+ return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal and " +
+ s"rightTimePoint to be of same type if rightTemporal is of type Time Point.")
+ }
+ } else if (!isTimeInterval(rightTemporal.resultType)) {
+ return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal to be of " +
+ s"type Time Point or Time Interval.")
+ }
+ ValidationSuccess
+ }
+
+ override def toString: String = s"temporalOverlaps(${children.mkString(", ")})"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ convertOverlaps(
+ leftTimePoint.toRexNode,
+ leftTemporal.toRexNode,
+ rightTimePoint.toRexNode,
+ rightTemporal.toRexNode,
+ relBuilder.asInstanceOf[FlinkRelBuilder])
+ }
+
+ /**
+ * Standard conversion of the OVERLAPS operator.
+ * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]]
+ */
+ private def convertOverlaps(
+ leftP: RexNode,
+ leftT: RexNode,
+ rightP: RexNode,
+ rightT: RexNode,
+ relBuilder: FlinkRelBuilder)
+ : RexNode = {
+ val convLeftT = convertOverlapsEnd(relBuilder, leftP, leftT, leftTemporal.resultType)
+ val convRightT = convertOverlapsEnd(relBuilder, rightP, rightT, rightTemporal.resultType)
+
+ // sort end points into start and end, such that (s0 <= e0) and (s1 <= e1).
+ val (s0, e0) = buildSwap(relBuilder, leftP, convLeftT)
+ val (s1, e1) = buildSwap(relBuilder, rightP, convRightT)
+
+ // (e0 >= s1) AND (e1 >= s0)
+ val leftPred = relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e0, s1)
+ val rightPred = relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e1, s0)
+ relBuilder.call(FlinkSqlOperatorTable.AND, leftPred, rightPred)
+ }
+
+ private def convertOverlapsEnd(
+ relBuilder: FlinkRelBuilder,
+ start: RexNode, end: RexNode,
+ endType: TypeInformation[_]) = {
+ if (isTimeInterval(endType)) {
+ relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, start, end)
+ } else {
+ end
+ }
+ }
+
+ private def buildSwap(relBuilder: FlinkRelBuilder, start: RexNode, end: RexNode) = {
+ val le = relBuilder.call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, start, end)
+ val l = relBuilder.call(FlinkSqlOperatorTable.CASE, le, start, end)
+ val r = relBuilder.call(FlinkSqlOperatorTable.CASE, le, end, start)
+ (l, r)
+ }
+}
+
+case class DateFormat(timestamp: PlannerExpression, format: PlannerExpression)
+ extends PlannerExpression {
+ override private[flink] def children = timestamp :: format :: Nil
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder) =
+ relBuilder.call(FlinkSqlOperatorTable.DATE_FORMAT, timestamp.toRexNode, format.toRexNode)
+
+ override def toString: String = s"$timestamp.dateFormat($format)"
+
+ override private[flink] def resultType = STRING_TYPE_INFO
+}
+
+case class TimestampDiff(
+ timePointUnit: PlannerExpression,
+ timePoint1: PlannerExpression,
+ timePoint2: PlannerExpression)
+ extends PlannerExpression {
+
+ override private[flink] def children: Seq[PlannerExpression] =
+ timePointUnit :: timePoint1 :: timePoint2 :: Nil
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (!TypeInfoCheckUtils.isTimePoint(timePoint1.resultType)) {
+ return ValidationFailure(
+ s"$this requires an input time point type, " +
+ s"but timePoint1 is of type '${timePoint1.resultType}'.")
+ }
+
+ if (!TypeInfoCheckUtils.isTimePoint(timePoint2.resultType)) {
+ return ValidationFailure(
+ s"$this requires an input time point type, " +
+ s"but timePoint2 is of type '${timePoint2.resultType}'.")
+ }
+
+ timePointUnit match {
+ case SymbolPlannerExpression(PlannerTimePointUnit.YEAR)
+ | SymbolPlannerExpression(PlannerTimePointUnit.QUARTER)
+ | SymbolPlannerExpression(PlannerTimePointUnit.MONTH)
+ | SymbolPlannerExpression(PlannerTimePointUnit.WEEK)
+ | SymbolPlannerExpression(PlannerTimePointUnit.DAY)
+ | SymbolPlannerExpression(PlannerTimePointUnit.HOUR)
+ | SymbolPlannerExpression(PlannerTimePointUnit.MINUTE)
+ | SymbolPlannerExpression(PlannerTimePointUnit.SECOND)
+ if timePoint1.resultType == SqlTimeTypeInfo.DATE
+ || timePoint1.resultType == SqlTimeTypeInfo.TIMESTAMP
+ || timePoint2.resultType == SqlTimeTypeInfo.DATE
+ || timePoint2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ ValidationSuccess
+
+ case _ =>
+ ValidationFailure(s"$this operator does not support unit '$timePointUnit'" +
+ s" for input of type ('${timePoint1.resultType}', '${timePoint2.resultType}').")
+ }
+ }
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(FlinkSqlOperatorTable.TIMESTAMP_DIFF,
+ Seq(timePointUnit.toRexNode, timePoint2.toRexNode, timePoint1.toRexNode))
+ }
+
+ override def toString: String = s"timestampDiff(${children.mkString(", ")})"
+
+ override private[flink] def resultType = INT_TYPE_INFO
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
new file mode 100644
index 0000000..72b7c25
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess}
+
+trait WindowProperty {
+
+ def toNamedWindowProperty(name: String): NamedWindowProperty
+
+ def resultType: TypeInformation[_]
+
+}
+
+abstract class AbstractWindowProperty(child: PlannerExpression)
+ extends UnaryExpression
+ with WindowProperty {
+
+ override def toString = s"WindowProperty($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+ throw new UnsupportedOperationException("WindowProperty cannot be transformed to RexNode.")
+
+ override private[flink] def validateInput() =
+ if (child.isInstanceOf[WindowReference]) {
+ ValidationSuccess
+ } else {
+ ValidationFailure("Child must be a window reference.")
+ }
+
+ def toNamedWindowProperty(name: String): NamedWindowProperty = NamedWindowProperty(name, this)
+}
+
+case class WindowStart(child: PlannerExpression) extends AbstractWindowProperty(child) {
+
+ override def resultType = SqlTimeTypeInfo.TIMESTAMP
+
+ override def toString: String = s"start($child)"
+}
+
+case class WindowEnd(child: PlannerExpression) extends AbstractWindowProperty(child) {
+
+ override def resultType = SqlTimeTypeInfo.TIMESTAMP
+
+ override def toString: String = s"end($child)"
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 082d17a..df19f5a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -128,6 +128,14 @@ object UserDefinedFunctionUtils {
getParamClassesConsiderVarArgs(method.isVarArgs, method.getParameterTypes, expectedTypes.length)
}
+ def getEvalMethodSignatureOption(
+ func: ScalarFunction,
+ expectedTypes: Array[LogicalType]): Option[Array[Class[_]]] = {
+ getEvalUserDefinedMethod(func, expectedTypes).map( method =>
+ getParamClassesConsiderVarArgs(
+ method.isVarArgs, method.getParameterTypes, expectedTypes.length))
+ }
+
def getEvalMethodSignature(
func: TableFunction[_],
expectedTypes: Array[LogicalType]): Array[Class[_]] = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/TreeNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
new file mode 100644
index 0000000..0535aa4
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan
+
+import org.apache.flink.table.typeutils.TypeInfoCheckUtils
+
+/**
+ * Generic base class for trees that can be transformed and traversed.
+ */
+abstract class TreeNode[A <: TreeNode[A]] extends Product { self: A =>
+
+ /**
+ * List of child nodes that should be considered when doing transformations. Other values
+ * in the Product will not be transformed, only handed through.
+ */
+ private[flink] def children: Seq[A]
+
+ /**
+ * Tests for equality by first testing for reference equality.
+ */
+ private[flink] def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
+
+ /**
+ * Do tree transformation in post order.
+ */
+ private[flink] def postOrderTransform(rule: PartialFunction[A, A]): A = {
+ def childrenTransform(rule: PartialFunction[A, A]): A = {
+ var changed = false
+ val newArgs = productIterator.map {
+ case arg: TreeNode[_] if children.contains(arg) =>
+ val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
+ if (!(newChild fastEquals arg)) {
+ changed = true
+ newChild
+ } else {
+ arg
+ }
+ case args: Traversable[_] => args.map {
+ case arg: TreeNode[_] if children.contains(arg) =>
+ val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
+ if (!(newChild fastEquals arg)) {
+ changed = true
+ newChild
+ } else {
+ arg
+ }
+ case other => other
+ }
+ case nonChild: AnyRef => nonChild
+ case null => null
+ }.toArray
+ if (changed) makeCopy(newArgs) else this
+ }
+
+ val afterChildren = childrenTransform(rule)
+ if (afterChildren fastEquals this) {
+ rule.applyOrElse(this, identity[A])
+ } else {
+ rule.applyOrElse(afterChildren, identity[A])
+ }
+ }
+
+ /**
+ * Runs the given function first on the node and then recursively on all its children.
+ */
+ private[flink] def preOrderVisit(f: A => Unit): Unit = {
+ f(this)
+ children.foreach(_.preOrderVisit(f))
+ }
+
+ /**
+ * Creates a new copy of this expression with new children. This is used during transformation
+ * if children change.
+ */
+ private[flink] def makeCopy(newArgs: Array[AnyRef]): A = {
+ val ctors = getClass.getConstructors.filter(_.getParameterTypes.length > 0)
+ if (ctors.isEmpty) {
+ throw new RuntimeException(s"No valid constructor for ${getClass.getSimpleName}")
+ }
+
+ val defaultCtor = ctors.find { ctor =>
+ if (ctor.getParameterTypes.length != newArgs.length) {
+ false
+ } else if (newArgs.contains(null)) {
+ false
+ } else {
+ val argsClasses: Array[Class[_]] = newArgs.map(_.getClass)
+ TypeInfoCheckUtils.isAssignable(argsClasses, ctor.getParameterTypes)
+ }
+ }.getOrElse(ctors.maxBy(_.getParameterTypes.length))
+
+ try {
+ defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
+ } catch {
+ case e: Throwable =>
+ throw new RuntimeException(
+ s"Fail to copy tree node ${getClass.getName}.", e)
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
index 2f51b5b..bbea8a8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
@@ -24,9 +24,8 @@ import org.apache.flink.table.catalog.{FunctionCatalog, FunctionLookup}
import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{AND, CAST, OR}
-import org.apache.flink.table.types.LogicalTypeDataTypeConverter
+import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
import org.apache.flink.table.types.logical.LogicalTypeRoot._
-import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.table.util.Logging
import org.apache.flink.util.Preconditions
@@ -206,7 +205,7 @@ class RexNodeToExpressionConverter(
Preconditions.checkArgument(inputRef.getIndex < inputNames.length)
Some(new FieldReferenceExpression(
inputNames(inputRef.getIndex),
- TypeConversions.fromLogicalToDataType(FlinkTypeFactory.toLogicalType(inputRef.getType)),
+ fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(inputRef.getType)),
0,
inputRef.getIndex
))
@@ -283,7 +282,7 @@ class RexNodeToExpressionConverter(
}
Some(valueLiteral(literalValue,
- LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(literalType)))
+ fromLogicalTypeToDataType(literalType)))
}
override def visitCall(rexCall: RexCall): Option[Expression] = {
@@ -302,7 +301,7 @@ class RexNodeToExpressionConverter(
Option(operands.reduceLeft { (l, r) => unresolvedCall(AND, l, r) })
case SqlStdOperatorTable.CAST =>
Option(unresolvedCall(CAST, operands.head,
- typeLiteral(LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
+ typeLiteral(fromLogicalTypeToDataType(
FlinkTypeFactory.toLogicalType(rexCall.getType)))))
case function: SqlFunction =>
lookupFunction(replace(function.getName), operands)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeInfoCheckUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeInfoCheckUtils.scala
new file mode 100644
index 0000000..d73d9aa
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeInfoCheckUtils.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo._
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, PojoTypeInfo}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo.{INTERVAL_MILLIS, INTERVAL_MONTHS}
+import org.apache.flink.table.validate._
+
+object TypeInfoCheckUtils {
+
+ /**
+ * Checks if type information is an advanced type that can be converted to a
+ * SQL type but NOT vice versa.
+ */
+ def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
+ case _: TimeIndicatorTypeInfo => false
+ case _: BasicTypeInfo[_] => false
+ case _: SqlTimeTypeInfo[_] => false
+ case _: TimeIntervalTypeInfo[_] => false
+ case _ => true
+ }
+
+ /**
+ * Checks if type information is a simple type that can be converted to a
+ * SQL type and vice versa.
+ */
+ def isSimple(dataType: TypeInformation[_]): Boolean = !isAdvanced(dataType)
+
+ def isNumeric(dataType: TypeInformation[_]): Boolean = dataType match {
+ case _: NumericTypeInfo[_] => true
+ case BIG_DEC_TYPE_INFO => true
+ case _ => false
+ }
+
+ def isTemporal(dataType: TypeInformation[_]): Boolean =
+ isTimePoint(dataType) || isTimeInterval(dataType)
+
+ def isTimePoint(dataType: TypeInformation[_]): Boolean =
+ dataType.isInstanceOf[SqlTimeTypeInfo[_]]
+
+ def isTimeInterval(dataType: TypeInformation[_]): Boolean =
+ dataType.isInstanceOf[TimeIntervalTypeInfo[_]]
+
+ def isString(dataType: TypeInformation[_]): Boolean = dataType == STRING_TYPE_INFO
+
+ def isBoolean(dataType: TypeInformation[_]): Boolean = dataType == BOOLEAN_TYPE_INFO
+
+ def isDecimal(dataType: TypeInformation[_]): Boolean = dataType == BIG_DEC_TYPE_INFO
+
+ def isInteger(dataType: TypeInformation[_]): Boolean = dataType == INT_TYPE_INFO
+
+ def isIntegerFamily(dataType: TypeInformation[_]): Boolean =
+ dataType.isInstanceOf[IntegerTypeInfo[_]]
+
+ def isLong(dataType: TypeInformation[_]): Boolean = dataType == LONG_TYPE_INFO
+
+ def isIntervalMonths(dataType: TypeInformation[_]): Boolean = dataType == INTERVAL_MONTHS
+
+ def isIntervalMillis(dataType: TypeInformation[_]): Boolean = dataType == INTERVAL_MILLIS
+
+ def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
+ case _: ObjectArrayTypeInfo[_, _] |
+ _: BasicArrayTypeInfo[_, _] |
+ _: PrimitiveArrayTypeInfo[_] => true
+ case _ => false
+ }
+
+ def isMap(dataType: TypeInformation[_]): Boolean =
+ dataType.isInstanceOf[MapTypeInfo[_, _]]
+
+ def isComparable(dataType: TypeInformation[_]): Boolean =
+ classOf[Comparable[_]].isAssignableFrom(dataType.getTypeClass) && !isArray(dataType)
+
+ /**
+ * Types that can be easily converted into a string without ambiguity.
+ */
+ def isSimpleStringRepresentation(dataType: TypeInformation[_]): Boolean =
+ isNumeric(dataType) || isString(dataType) || isTemporal(dataType) || isBoolean(dataType)
+
+ def assertNumericExpr(
+ dataType: TypeInformation[_],
+ caller: String)
+ : ValidationResult = dataType match {
+ case _: NumericTypeInfo[_] =>
+ ValidationSuccess
+ case BIG_DEC_TYPE_INFO =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure(s"$caller requires numeric types, get $dataType here")
+ }
+
+ def assertIntegerFamilyExpr(
+ dataType: TypeInformation[_],
+ caller: String)
+ : ValidationResult = dataType match {
+ case _: IntegerTypeInfo[_] =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure(s"$caller requires integer types but was '$dataType'.")
+ }
+
+ def assertOrderableExpr(dataType: TypeInformation[_], caller: String): ValidationResult = {
+ if (dataType.isSortKeyType) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"$caller requires orderable types, get $dataType here")
+ }
+ }
+
+ /**
+ * Checks whether a type implements own hashCode() and equals() methods for storing an instance
+ * in Flink's state or performing a keyBy operation.
+ *
+ * @param name name of the operation
+ * @param t type information to be validated
+ */
+ def validateEqualsHashCode(name: String, t: TypeInformation[_]): Unit = t match {
+
+ // make sure that a POJO class is a valid state type
+ case pt: PojoTypeInfo[_] =>
+ // we don't check the types recursively to give a chance of wrapping
+ // proper hashCode/equals methods around an immutable type
+ validateEqualsHashCode(name, pt.getClass)
+ // recursively check composite types
+ case ct: CompositeType[_] =>
+ validateEqualsHashCode(name, t.getTypeClass)
+ // we check recursively for entering Flink types such as tuples and rows
+ for (i <- 0 until ct.getArity) {
+ val subtype = ct.getTypeAt(i)
+ validateEqualsHashCode(name, subtype)
+ }
+ // check other type information only based on the type class
+ case _: TypeInformation[_] =>
+ validateEqualsHashCode(name, t.getTypeClass)
+ }
+
+ /**
+ * Checks whether a class implements own hashCode() and equals() methods for storing an instance
+ * in Flink's state or performing a keyBy operation.
+ *
+ * @param name name of the operation
+ * @param c class to be validated
+ */
+ def validateEqualsHashCode(name: String, c: Class[_]): Unit = {
+
+ // skip primitives
+ if (!c.isPrimitive) {
+ // check the component type of arrays
+ if (c.isArray) {
+ validateEqualsHashCode(name, c.getComponentType)
+ }
+ // check type for methods
+ else {
+ if (c.getMethod("hashCode").getDeclaringClass eq classOf[Object]) {
+ throw new ValidationException(
+ s"Type '${c.getCanonicalName}' cannot be used in a $name operation because it " +
+ s"does not implement a proper hashCode() method.")
+ }
+ if (c.getMethod("equals", classOf[Object]).getDeclaringClass eq classOf[Object]) {
+ throw new ValidationException(
+ s"Type '${c.getCanonicalName}' cannot be used in a $name operation because it " +
+ s"does not implement a proper equals() method.")
+ }
+ }
+ }
+ }
+
+ /**
+ * Checks if a class is a Java primitive wrapper.
+ */
+ def isPrimitiveWrapper(clazz: Class[_]): Boolean = {
+ clazz == classOf[java.lang.Boolean] ||
+ clazz == classOf[java.lang.Byte] ||
+ clazz == classOf[java.lang.Character] ||
+ clazz == classOf[java.lang.Short] ||
+ clazz == classOf[java.lang.Integer] ||
+ clazz == classOf[java.lang.Long] ||
+ clazz == classOf[java.lang.Double] ||
+ clazz == classOf[java.lang.Float]
+ }
+
+ /**
+ * Checks if one class can be assigned to a variable of another class.
+ *
+ * Adopted from o.a.commons.lang.ClassUtils#isAssignable(java.lang.Class[], java.lang.Class[])
+ * but without null checks.
+ */
+ def isAssignable(classArray: Array[Class[_]], toClassArray: Array[Class[_]]): Boolean = {
+ if (classArray.length != toClassArray.length) {
+ return false
+ }
+ var i = 0
+ while (i < classArray.length) {
+ if (!isAssignable(classArray(i), toClassArray(i))) {
+ return false
+ }
+ i += 1
+ }
+ true
+ }
+
+ /**
+ * Checks if one class can be assigned to a variable of another class.
+ *
+ * Adopted from o.a.commons.lang.ClassUtils#isAssignable(java.lang.Class, java.lang.Class) but
+ * without null checks.
+ */
+ def isAssignable(cls: Class[_], toClass: Class[_]): Boolean = {
+ if (cls.equals(toClass)) {
+ return true
+ }
+ if (cls.isPrimitive) {
+ if (!toClass.isPrimitive) {
+ return false
+ }
+ if (java.lang.Integer.TYPE.equals(cls)) {
+ return java.lang.Long.TYPE.equals(toClass) ||
+ java.lang.Float.TYPE.equals(toClass) ||
+ java.lang.Double.TYPE.equals(toClass)
+ }
+ if (java.lang.Long.TYPE.equals(cls)) {
+ return java.lang.Float.TYPE.equals(toClass) ||
+ java.lang.Double.TYPE.equals(toClass)
+ }
+ if (java.lang.Boolean.TYPE.equals(cls)) {
+ return false
+ }
+ if (java.lang.Double.TYPE.equals(cls)) {
+ return false
+ }
+ if (java.lang.Float.TYPE.equals(cls)) {
+ return java.lang.Double.TYPE.equals(toClass)
+ }
+ if (java.lang.Character.TYPE.equals(cls)) {
+ return java.lang.Integer.TYPE.equals(toClass) ||
+ java.lang.Long.TYPE.equals(toClass) ||
+ java.lang.Float.TYPE.equals(toClass) ||
+ java.lang.Double.TYPE.equals(toClass)
+ }
+ if (java.lang.Short.TYPE.equals(cls)) {
+ return java.lang.Integer.TYPE.equals(toClass) ||
+ java.lang.Long.TYPE.equals(toClass) ||
+ java.lang.Float.TYPE.equals(toClass) ||
+ java.lang.Double.TYPE.equals(toClass)
+ }
+ if (java.lang.Byte.TYPE.equals(cls)) {
+ return java.lang.Short.TYPE.equals(toClass) ||
+ java.lang.Integer.TYPE.equals(toClass) ||
+ java.lang.Long.TYPE.equals(toClass) ||
+ java.lang.Float.TYPE.equals(toClass) ||
+ java.lang.Double.TYPE.equals(toClass)
+ }
+ // should never get here
+ return false
+ }
+ toClass.isAssignableFrom(cls)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala
new file mode 100644
index 0000000..64a568b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.validate
+
+/**
+ * Represents the result of a validation.
+ */
+sealed trait ValidationResult {
+ def isFailure: Boolean = !isSuccess
+ def isSuccess: Boolean
+
+ /**
+ * Allows constructing a cascade of validation results.
+ * The first failure result will be returned.
+ */
+ def orElse(other: ValidationResult): ValidationResult = {
+ if (isSuccess) {
+ other
+ } else {
+ this
+ }
+ }
+}
+
+/**
+ * Represents the successful result of a validation.
+ */
+object ValidationSuccess extends ValidationResult {
+ val isSuccess: Boolean = true
+}
+
+/**
+ * Represents the failing result of a validation,
+ * with a error message to show the reason of failure.
+ */
+case class ValidationFailure(message: String) extends ValidationResult {
+ val isSuccess: Boolean = false
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/KeywordParseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/KeywordParseTest.scala
new file mode 100644
index 0000000..6ee4b19
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/KeywordParseTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{lookupCall, unresolvedCall, unresolvedRef}
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions
+
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+/**
+ * Tests keyword as suffix.
+ */
+class KeywordParseTest {
+
+ @Test
+ def testKeyword(): Unit = {
+ assertEquals(
+ unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC, unresolvedRef("f0")),
+ ExpressionParser.parseExpression("f0.asc"))
+ assertEquals(
+ unresolvedCall(BuiltInFunctionDefinitions.ORDER_ASC, unresolvedRef("f0")),
+ ExpressionParser.parseExpression("f0.asc()"))
+ }
+
+ @Test
+ def testKeywordAsPrefixInFunctionName(): Unit = {
+ assertEquals(
+ lookupCall("ascii", unresolvedRef("f0")),
+ ExpressionParser.parseExpression("f0.ascii()"))
+ }
+
+ @Test
+ def testKeywordAsInfixInFunctionName(): Unit = {
+ assertEquals(
+ lookupCall("iiascii", unresolvedRef("f0")),
+ ExpressionParser.parseExpression("f0.iiascii()"))
+ }
+
+ @Test
+ def testKeywordAsSuffixInFunctionName(): Unit = {
+ assertEquals(
+ lookupCall("iiasc", unresolvedRef("f0")),
+ ExpressionParser.parseExpression("f0.iiasc()"))
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
index e4641af..79311e2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.plan.util
import org.apache.flink.table.api.Types
import org.apache.flink.table.catalog.FunctionCatalog
import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral}
-import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.expressions.utils.Func1
+import org.apache.flink.table.expressions.{EqualTo, Expression, ExpressionBridge, ExpressionParser, GreaterThan, Literal, PlannerExpression, PlannerExpressionConverter, Sum, UnresolvedFieldReference}
import org.apache.flink.table.functions.AggregateFunctionDefinition
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{DIVIDE, EQUALS, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL, MINUS, NOT, NOT_EQUALS, OR, PLUS, TIMES}
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{EQUALS, GREATER_THAN, LESS_THAN, LESS_THAN_OR_EQUAL}
import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.functions.utils.ScalarSqlFunction
import org.apache.flink.table.plan.util.InputTypeBuilder.inputOf
@@ -55,6 +55,11 @@ class RexNodeExtractorTest extends RexNodeTestBase {
private val functionCatalog = new FunctionCatalog("default_catalog", "default_database")
+ private val expressionBridge: ExpressionBridge[PlannerExpression] =
+ new ExpressionBridge[PlannerExpression](
+ functionCatalog,
+ PlannerExpressionConverter.INSTANCE)
+
@Test
def testExtractRefInputFields(): Unit = {
val usedFields = RexNodeExtractor.extractRefInputFields(buildExprs())
@@ -215,13 +220,8 @@ class RexNodeExtractorTest extends RexNodeTestBase {
val builder: RexBuilder = new RexBuilder(typeFactory)
val expr = buildConditionExpr()
- // id > 6
- val firstExp = unresolvedCall(GREATER_THAN, unresolvedRef("id"), valueLiteral(6))
-
- // amount * price < 100
- val secondExp = unresolvedCall(LESS_THAN,
- unresolvedCall(TIMES, unresolvedRef("amount"), unresolvedRef("price")),
- valueLiteral(100))
+ val firstExp = ExpressionParser.parseExpression("id > 6")
+ val secondExp = ExpressionParser.parseExpression("amount * price < 100")
val expected: Array[Expression] = Array(firstExp, secondExp)
val (convertedExpressions, unconvertedRexNodes) =
@@ -232,7 +232,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
builder,
functionCatalog)
- assertExpressionArrayEquals(expected, convertedExpressions)
+ assertPlannerExpressionArrayEquals(expected, convertedExpressions)
assertEquals(0, unconvertedRexNodes.length)
}
@@ -255,10 +255,8 @@ class RexNodeExtractorTest extends RexNodeTestBase {
relBuilder,
functionCatalog)
- // amount >= id
- val expr = unresolvedCall(GREATER_THAN_OR_EQUAL, unresolvedRef("amount"), unresolvedRef("id"))
- val expected: Array[Expression] = Array(expr)
- assertExpressionArrayEquals(expected, convertedExpressions)
+ val expected: Array[Expression] = Array(ExpressionParser.parseExpression("amount >= id"))
+ assertPlannerExpressionArrayEquals(expected, convertedExpressions)
assertEquals(0, unconvertedRexNodes.length)
}
@@ -307,25 +305,10 @@ class RexNodeExtractorTest extends RexNodeTestBase {
functionCatalog)
val expected: Array[Expression] = Array(
- // amount < 100 || price == 100 || price == 200
- unresolvedCall(OR,
- unresolvedCall(OR,
- unresolvedCall(LESS_THAN, unresolvedRef("amount"), valueLiteral(100)),
- unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(100))),
- unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(200))
- ),
- // id > 100 || price == 100 || price == 200
- unresolvedCall(OR,
- unresolvedCall(OR,
- unresolvedCall(GREATER_THAN, unresolvedRef("id"), valueLiteral(100)),
- unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(100))),
- unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(200))
- ),
- // not(amount <= id)
- unresolvedCall(NOT,
- unresolvedCall(LESS_THAN_OR_EQUAL, unresolvedRef("amount"), unresolvedRef("id")))
- )
- assertExpressionArrayEquals(expected, convertedExpressions)
+ ExpressionParser.parseExpression("amount < 100 || price == 100 || price === 200"),
+ ExpressionParser.parseExpression("id > 100 || price == 100 || price === 200"),
+ ExpressionParser.parseExpression("!(amount <= id)"))
+ assertPlannerExpressionArrayEquals(expected, convertedExpressions)
assertEquals(0, unconvertedRexNodes.length)
}
@@ -362,17 +345,13 @@ class RexNodeExtractorTest extends RexNodeTestBase {
functionCatalog)
val expected: Array[Expression] = Array(
- // amount < 100
- unresolvedCall(LESS_THAN, unresolvedRef("amount"), valueLiteral(100)),
- // amount <= id
- unresolvedCall(LESS_THAN_OR_EQUAL, unresolvedRef("amount"), unresolvedRef("id")),
- // id > 100
- unresolvedCall(GREATER_THAN, unresolvedRef("id"), valueLiteral(100)),
- // price === 100
- unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(100))
+ ExpressionParser.parseExpression("amount < 100"),
+ ExpressionParser.parseExpression("amount <= id"),
+ ExpressionParser.parseExpression("id > 100"),
+ ExpressionParser.parseExpression("price === 100")
)
- assertExpressionArrayEquals(expected, convertedExpressions)
+ assertPlannerExpressionArrayEquals(expected, convertedExpressions)
assertEquals(0, unconvertedRexNodes.length)
}
@@ -421,7 +400,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
unresolvedCall(EQUALS, unresolvedRef("price"), valueLiteral(200.1))
)
- assertExpressionArrayEquals(expected, convertedExpressions)
+ assertPlannerExpressionArrayEquals(expected, convertedExpressions)
assertEquals(0, unconvertedRexNodes.length)
}
@@ -453,22 +432,41 @@ class RexNodeExtractorTest extends RexNodeTestBase {
relBuilder,
functionCatalog)
- val expected = Array[Expression](
- // timestamp_col = '2017-09-10 14:23:01'
- unresolvedCall(EQUALS, unresolvedRef("timestamp_col"), valueLiteral(
- new Timestamp(DateTimeUtils.timestampStringToUnixDate("2017-09-10 14:23:01")))
- ),
- // date_col = '2017-09-12'
- unresolvedCall(EQUALS, unresolvedRef("date_col"), valueLiteral(
- new Date(DateTimeUtils.dateStringToUnixDate("2017-09-12") * DateTimeUtils.MILLIS_PER_DAY))
- ),
- // time_col = '14:23:01'
- unresolvedCall(EQUALS, unresolvedRef("time_col"), valueLiteral(
- new Time(DateTimeUtils.timeStringToUnixDate("14:23:01").longValue()))
+ val timestamp = new Timestamp(DateTimeUtils.timestampStringToUnixDate("2017-09-10 14:23:01"))
+ val date = new Date(
+ DateTimeUtils.dateStringToUnixDate("2017-09-12") * DateTimeUtils.MILLIS_PER_DAY)
+ val time = new Time(DateTimeUtils.timeStringToUnixDate("14:23:01").longValue())
+
+ {
+ val expected = Array[Expression](
+ // timestamp_col = '2017-09-10 14:23:01'
+ unresolvedCall(EQUALS, unresolvedRef("timestamp_col"), valueLiteral(timestamp)),
+ // date_col = '2017-09-12'
+ unresolvedCall(EQUALS, unresolvedRef("date_col"), valueLiteral(date)),
+ // time_col = '14:23:01'
+ unresolvedCall(EQUALS, unresolvedRef("time_col"), valueLiteral(time))
)
- )
- assertExpressionArrayEquals(expected, converted)
+ assertExpressionArrayEquals(expected, converted)
+ }
+
+ {
+ val expected = Array[Expression](
+ EqualTo(
+ UnresolvedFieldReference("timestamp_col"),
+ Literal(timestamp)
+ ),
+ EqualTo(
+ UnresolvedFieldReference("date_col"),
+ Literal(date)
+ ),
+ EqualTo(
+ UnresolvedFieldReference("time_col"),
+ Literal(time)
+ )
+ )
+ assertPlannerExpressionArrayEquals(expected, converted)
+ }
}
@Test
@@ -522,34 +520,18 @@ class RexNodeExtractorTest extends RexNodeTestBase {
functionCatalog)
val expected: Array[Expression] = Array(
- // amount < id
- unresolvedCall(LESS_THAN, unresolvedRef("amount"), unresolvedRef("id")),
- // amount <= id
- unresolvedCall(LESS_THAN_OR_EQUAL, unresolvedRef("amount"), unresolvedRef("id")),
- // amount <> id
- unresolvedCall(NOT_EQUALS, unresolvedRef("amount"), unresolvedRef("id")),
- // amount = id
- unresolvedCall(EQUALS, unresolvedRef("amount"), unresolvedRef("id")),
- // amount >= id
- unresolvedCall(GREATER_THAN_OR_EQUAL, unresolvedRef("amount"), unresolvedRef("id")),
- // amount > id
- unresolvedCall(GREATER_THAN, unresolvedRef("amount"), unresolvedRef("id")),
- // amount + id == 100
- unresolvedCall(EQUALS,
- unresolvedCall(PLUS, unresolvedRef("amount"), unresolvedRef("id")), valueLiteral(100)),
- // amount - id == 100
- unresolvedCall(EQUALS,
- unresolvedCall(MINUS, unresolvedRef("amount"), unresolvedRef("id")), valueLiteral(100)),
- // amount * id == 100
- unresolvedCall(EQUALS,
- unresolvedCall(TIMES, unresolvedRef("amount"), unresolvedRef("id")), valueLiteral(100)),
- // amount / id == 100
- unresolvedCall(EQUALS,
- unresolvedCall(DIVIDE, unresolvedRef("amount"), unresolvedRef("id")), valueLiteral(100))
- // -amount == 100
- // ExpressionParser.parseExpression("-amount == 100")
+ ExpressionParser.parseExpression("amount < id"),
+ ExpressionParser.parseExpression("amount <= id"),
+ ExpressionParser.parseExpression("amount <> id"),
+ ExpressionParser.parseExpression("amount == id"),
+ ExpressionParser.parseExpression("amount >= id"),
+ ExpressionParser.parseExpression("amount > id"),
+ ExpressionParser.parseExpression("amount + id == 100"),
+ ExpressionParser.parseExpression("amount - id == 100"),
+ ExpressionParser.parseExpression("amount * id == 100"),
+ ExpressionParser.parseExpression("amount / id == 100")
)
- assertExpressionArrayEquals(expected, convertedExpressions)
+ assertPlannerExpressionArrayEquals(expected, convertedExpressions)
assertEquals(0, unconvertedRexNodes.length)
}
@@ -591,17 +573,25 @@ class RexNodeExtractorTest extends RexNodeTestBase {
relBuilder,
functionCatalog)
- val expected: Array[Expression] = Array(
- // sum(amount) > 100
- unresolvedCall(GREATER_THAN,
- unresolvedCall(
- new AggregateFunctionDefinition("sum", new IntSumAggFunction, Types.INT, Types.INT),
- unresolvedRef("amount")),
- valueLiteral(100)
+ {
+ val expected: Array[Expression] = Array(
+ // sum(amount) > 100
+ unresolvedCall(GREATER_THAN,
+ unresolvedCall(
+ new AggregateFunctionDefinition("sum", new IntSumAggFunction, Types.INT, Types.INT),
+ unresolvedRef("amount")),
+ valueLiteral(100)
+ )
)
- )
- assertExpressionArrayEquals(expected, convertedExpressions)
- assertEquals(1, unconvertedRexNodes.length)
+ assertExpressionArrayEquals(expected, convertedExpressions)
+ assertEquals(1, unconvertedRexNodes.length)
+ }
+
+ {
+ val expected: Array[Expression] = Array(
+ GreaterThan(Sum(UnresolvedFieldReference("amount")), Literal(100)))
+ assertPlannerExpressionArrayEquals(expected, convertedExpressions)
+ }
}
@Test
@@ -649,6 +639,10 @@ class RexNodeExtractorTest extends RexNodeTestBase {
assertEquals("or(greaterThan(cast(amount, BIGINT), 100), lessThanOrEqual(amount, id))",
convertedExpressions(2).toString)
assertEquals(0, unconvertedRexNodes.length)
+
+ assertPlannerExpressionArrayEquals(
+ Array(ExpressionParser.parseExpression("amount <= id")),
+ Array(convertedExpressions(1)))
}
@Test
@@ -759,4 +753,17 @@ class RexNodeExtractorTest extends RexNodeTestBase {
}
}
+ private def assertPlannerExpressionArrayEquals(
+ expected: Array[Expression],
+ actual: Array[Expression]): Unit = {
+ // TODO we assume only planner expression as a temporary solution to keep the old interfaces
+ val sortedExpected = expected.map(expressionBridge.bridge).sortBy(e => e.toString)
+ val sortedActual = actual.map(expressionBridge.bridge).sortBy(e => e.toString)
+
+ assertEquals(sortedExpected.length, sortedActual.length)
+ sortedExpected.zip(sortedActual).foreach {
+ case (l, r) => assertEquals(l.toString, r.toString)
+ }
+ }
+
}