You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/03/01 16:40:41 UTC
[flink] 01/04: [FLINK-16033][table-api] Introduced Java Table API
Expression DSL
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 029d578e098e2b02281d5c320da55c06e239b2b9
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Feb 11 08:25:12 2020 +0100
[FLINK-16033][table-api] Introduced Java Table API Expression DSL
---
.../org/apache/flink/table/api/ApiExpression.java | 66 +
.../org/apache/flink/table/api/Expressions.java | 549 +++++++++
.../org/apache/flink/table/api/GroupWindow.java | 5 +-
.../org/apache/flink/table/api/SessionWithGap.java | 3 +-
.../flink/table/api/SessionWithGapOnTime.java | 5 +-
.../table/api/SessionWithGapOnTimeWithAlias.java | 3 +-
.../org/apache/flink/table/api/SlideWithSize.java | 3 +-
.../flink/table/api/SlideWithSizeAndSlide.java | 5 +-
.../table/api/SlideWithSizeAndSlideOnTime.java | 13 +-
.../api/SlideWithSizeAndSlideOnTimeWithAlias.java | 13 +-
.../org/apache/flink/table/api/TumbleWithSize.java | 3 +-
.../flink/table/api/TumbleWithSizeOnTime.java | 5 +-
.../table/api/TumbleWithSizeOnTimeWithAlias.java | 3 +-
.../flink/table/api/internal/BaseExpressions.java | 1284 ++++++++++++++++++++
.../apache/flink/table/api/internal/TableImpl.java | 37 +-
.../table/expressions/ApiExpressionUtils.java | 39 +-
.../table/expressions/LookupCallExpression.java | 3 +-
.../expressions/UnresolvedCallExpression.java | 7 +-
.../expressions/resolver/ExpressionResolver.java | 2 +
.../expressions/resolver/LookupCallResolver.java | 12 +
.../resolver/rules/OverWindowResolverRule.java | 2 +-
.../expressions/resolver/rules/ResolverRules.java | 6 +
.../resolver/rules/UnwrapApiExpressionRule.java} | 30 +-
.../flink/table/typeutils/FieldInfoUtils.java | 14 +-
.../org/apache/flink/table/api/expressionDsl.scala | 1045 +++-------------
.../expressions/PlannerExpressionConverter.scala | 10 +-
.../expressions/PlannerExpressionConverter.scala | 8 +-
27 files changed, 2205 insertions(+), 970 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java
new file mode 100644
index 0000000..0e2027d
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.table.api.internal.BaseExpressions;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+
+import java.util.List;
+
+/**
+ * Java API class that gives access to expression operations.
+ *
+ * @see BaseExpressions
+ */
+public final class ApiExpression extends BaseExpressions<Object, ApiExpression> implements Expression {
+ private final Expression wrappedExpression;
+
+ @Override
+ public String asSummaryString() {
+ return wrappedExpression.asSummaryString();
+ }
+
+ ApiExpression(Expression wrappedExpression) {
+ if (wrappedExpression instanceof ApiExpression) {
+ throw new UnsupportedOperationException("This is a bug. Please file an issue.");
+ }
+ this.wrappedExpression = wrappedExpression;
+ }
+
+ @Override
+ public Expression toExpr() {
+ return wrappedExpression;
+ }
+
+ @Override
+ protected ApiExpression toApiSpecificExpression(Expression expression) {
+ return new ApiExpression(expression);
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return wrappedExpression.getChildren();
+ }
+
+ @Override
+ public <R> R accept(ExpressionVisitor<R> visitor) {
+ return wrappedExpression.accept(visitor);
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
new file mode 100644
index 0000000..9e0c10e
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TimePointUnit;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.types.utils.ValueDataTypeConverter;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
+
+/**
+ * Entry point of the Table API Expression DSL such as: {@code $("myField").plus(10).abs()}
+ *
+ * <p>This class contains static methods for referencing table columns, creating literals,
+ * and building more complex {@link Expression} chains. {@link ApiExpression ApiExpressions} are
+ * pure API entities that are further translated into {@link ResolvedExpression ResolvedExpressions}
+ * under the hood.
+ *
+ * <p>For fluent definition of expressions and easier readability, we recommend to add a
+ * star import to the methods of this class:
+ *
+ * <pre>
+ * import static org.apache.flink.table.api.Expressions.*;
+ * </pre>
+ *
+ * <p>Check the documentation for more programming language specific APIs, for example, by using
+ * Scala implicits.
+ */
+@PublicEvolving
+public final class Expressions {
+ /**
+ * Creates an unresolved reference to a table's field.
+ *
+ * <p>Example:
+ * <pre>{@code
+ * tab.select($("key"), $("value"))
+ * }
+ * </pre>
+ */
+ //CHECKSTYLE.OFF: MethodName
+ public static ApiExpression $(String name) {
+ return new ApiExpression(unresolvedRef(name));
+ }
+ //CHECKSTYLE.ON: MethodName
+
+ /**
+ * Creates a SQL literal.
+ *
+ * <p>The data type is derived from the object's class and its value.
+ *
+ * <p>For example:
+ * <ul>
+ * <li>{@code lit(12)} leads to {@code INT}</li>
+ * <li>{@code lit("abc")} leads to {@code CHAR(3)}</li>
+ * <li>{@code lit(new BigDecimal("123.45"))} leads to {@code DECIMAL(5, 2)}</li>
+ * </ul>
+ *
+ * <p>See {@link ValueDataTypeConverter} for a list of supported literal values.
+ */
+ public static ApiExpression lit(Object v) {
+ return new ApiExpression(valueLiteral(v));
+ }
+
+ /**
+ * Creates a SQL literal of a given {@link DataType}.
+ *
+ * <p>The method {@link #lit(Object)} is preferred as it extracts the {@link DataType} automatically.
+ * Use this method only when necessary. The class of {@code v} must be supported according to the
+ * {@link org.apache.flink.table.types.logical.LogicalType#supportsInputConversion(Class)}.
+ */
+ public static ApiExpression lit(Object v, DataType dataType) {
+ return new ApiExpression(valueLiteral(v, dataType));
+ }
+
+ /**
+ * Indicates a range from 'start' to 'end', which can be used in columns
+ * selection.
+ *
+ * <p>Example:
+ * <pre>{@code
+ * Table table = ...
+ * table.select(withColumns(range(b, c)))
+ * }</pre>
+ *
+ * @see #withColumns(Object, Object...)
+ * @see #withoutColumns(Object, Object...)
+ */
+ public static ApiExpression range(String start, String end) {
+ return apiCall(BuiltInFunctionDefinitions.RANGE_TO, unresolvedRef(start), unresolvedRef(end));
+ }
+
+ /**
+ * Indicates an index based range, which can be used in columns selection.
+ *
+ * <p>Example:
+ * <pre>{@code
+ * Table table = ...
+ * table.select(withColumns(range(3, 4)))
+ * }</pre>
+ *
+ * @see #withColumns(Object, Object...)
+ * @see #withoutColumns(Object, Object...)
+ */
+ public static ApiExpression range(int start, int end) {
+ return apiCall(BuiltInFunctionDefinitions.RANGE_TO, valueLiteral(start), valueLiteral(end));
+ }
+
+ /**
+ * Boolean AND in three-valued logic.
+ */
+ public static ApiExpression and(Object predicate0, Object predicate1, Object... predicates) {
+ return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.AND, predicate0, predicate1, predicates);
+ }
+
+ /**
+ * Boolean OR in three-valued logic.
+ */
+ public static ApiExpression or(Object predicate0, Object predicate1, Object... predicates) {
+ return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.OR, predicate0, predicate1, predicates);
+ }
+
+ /**
+ * Offset constant to be used in the {@code preceding} clause of unbounded {@code Over} windows. Use this
+ * constant for a time interval. Unbounded over windows start with the first row of a partition.
+ */
+ public static final ApiExpression UNBOUNDED_ROW = apiCall(BuiltInFunctionDefinitions.UNBOUNDED_ROW);
+
+ /**
+ * Offset constant to be used in the {@code preceding} clause of unbounded {@link Over} windows. Use this
+ * constant for a row-count interval. Unbounded over windows start with the first row of a
+ * partition.
+ */
+ public static final ApiExpression UNBOUNDED_RANGE = apiCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE);
+
+ /**
+ * Offset constant to be used in the {@code following} clause of {@link Over} windows. Use this for setting
+ * the upper bound of the window to the current row.
+ */
+ public static final ApiExpression CURRENT_ROW = apiCall(BuiltInFunctionDefinitions.CURRENT_ROW);
+
+ /**
+ * Offset constant to be used in the {@code following} clause of {@link Over} windows. Use this for setting
+ * the upper bound of the window to the sort key of the current row, i.e., all rows with the same
+ * sort key as the current row are included in the window.
+ */
+ public static final ApiExpression CURRENT_RANGE = apiCall(BuiltInFunctionDefinitions.CURRENT_RANGE);
+
+ /**
+ * Returns the current SQL date in UTC time zone.
+ */
+ public static ApiExpression currentDate() {
+ return apiCall(BuiltInFunctionDefinitions.CURRENT_DATE);
+ }
+
+ /**
+ * Returns the current SQL time in UTC time zone.
+ */
+ public static ApiExpression currentTime() {
+ return apiCall(BuiltInFunctionDefinitions.CURRENT_TIME);
+ }
+
+ /**
+ * Returns the current SQL timestamp in UTC time zone.
+ */
+ public static ApiExpression currentTimestamp() {
+ return apiCall(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP);
+ }
+
+ /**
+ * Returns the current SQL time in local time zone.
+ */
+ public static ApiExpression localTime() {
+ return apiCall(BuiltInFunctionDefinitions.LOCAL_TIME);
+ }
+
+ /**
+ * Returns the current SQL timestamp in local time zone.
+ */
+ public static ApiExpression localTimestamp() {
+ return apiCall(BuiltInFunctionDefinitions.LOCAL_TIMESTAMP);
+ }
+
+ /**
+ * Determines whether two anchored time intervals overlap. Time point and temporal are
+ * transformed into a range defined by two time points (start, end). The function
+ * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
+ *
+ * <p>It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
+ *
+ * <p>e.g.
+ * <pre>{@code
+ * temporalOverlaps(
+ * lit("2:55:00").toTime(),
+ * interval(Duration.ofHour(1)),
+ * lit("3:30:00").toTime(),
+ * interval(Duration.ofHour(2))
+ * }</pre>
+ * leads to true
+ */
+ public static ApiExpression temporalOverlaps(
+ Object leftTimePoint,
+ Object leftTemporal,
+ Object rightTimePoint,
+ Object rightTemporal) {
+ return apiCall(
+ BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS,
+ leftTimePoint,
+ leftTemporal,
+ rightTimePoint,
+ rightTemporal);
+ }
+
+ /**
+ * Formats a timestamp as a string using a specified format.
+ * The format must be compatible with MySQL's date formatting syntax as used by the
+ * date_parse function.
+ *
+ * <p>For example {@code dataFormat($("time"), "%Y, %d %M")} results in strings formatted as "2017, 05 May".
+ *
+ * @param timestamp The timestamp to format as string.
+ * @param format The format of the string.
+ * @return The formatted timestamp as string.
+ */
+ public static ApiExpression dateFormat(
+ Object timestamp,
+ Object format) {
+ return apiCall(BuiltInFunctionDefinitions.DATE_FORMAT, timestamp, format);
+ }
+
+ /**
+ * Returns the (signed) number of {@link TimePointUnit} between timePoint1 and timePoint2.
+ *
+ * <p>For example, {@code timestampDiff(TimePointUnit.DAY, lit("2016-06-15").toDate(), lit("2016-06-18").toDate()}
+ * leads to 3.
+ *
+ * @param timePointUnit The unit to compute diff.
+ * @param timePoint1 The first point in time.
+ * @param timePoint2 The second point in time.
+ * @return The number of intervals as integer value.
+ */
+ public static ApiExpression timestampDiff(
+ TimePointUnit timePointUnit,
+ Object timePoint1,
+ Object timePoint2) {
+ return apiCall(BuiltInFunctionDefinitions.TIMESTAMP_DIFF, valueLiteral(timePointUnit), timePoint1, timePoint2);
+ }
+
+ /**
+ * Creates an array of literals.
+ */
+ public static ApiExpression array(Object head, Object... tail) {
+ return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ARRAY, head, tail);
+ }
+
+ /**
+ * Creates a row of expressions.
+ */
+ public static ApiExpression row(Object head, Object... tail) {
+ return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ROW, head, tail);
+ }
+
+ /**
+ * Creates a map of expressions.
+ *
+ * <pre>{@code
+ * table.select(
+ * map(
+ * "key1", 1,
+ * "key2", 2,
+ * "key3", 3
+ * ))
+ * }</pre>
+ *
+ * <p>Note keys and values should have the same types for all entries.
+ */
+ public static ApiExpression map(Object key, Object value, Object... tail) {
+ return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.MAP, key, value, tail);
+ }
+
+ /**
+ * Creates an interval of rows.
+ *
+ * @see Table#window(GroupWindow)
+ * @see Table#window(OverWindow...)
+ */
+ public static ApiExpression rowInterval(Long rows) {
+ return new ApiExpression(valueLiteral(rows));
+ }
+
+ /**
+ * Returns a value that is closer than any other value to pi.
+ */
+ public static ApiExpression pi() {
+ return apiCall(BuiltInFunctionDefinitions.PI);
+ }
+
+ /**
+ * Returns a value that is closer than any other value to e.
+ */
+ public static ApiExpression e() {
+ return apiCall(BuiltInFunctionDefinitions.E);
+ }
+
+ /**
+ * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
+ */
+ public static ApiExpression rand() {
+ return apiCall(BuiltInFunctionDefinitions.RAND);
+ }
+
+ /**
+ * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a
+ * initial seed. Two rand() functions will return identical sequences of numbers if they
+ * have same initial seed.
+ */
+ public static ApiExpression rand(Object seed) {
+ return apiCall(BuiltInFunctionDefinitions.RAND, objectToExpression(seed));
+ }
+
+ /**
+ * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified
+ * value (exclusive).
+ */
+ public static ApiExpression randInteger(Object bound) {
+ return apiCall(BuiltInFunctionDefinitions.RAND_INTEGER, objectToExpression(bound));
+ }
+
+ /**
+ * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value
+ * (exclusive) with a initial seed. Two randInteger() functions will return identical sequences
+ * of numbers if they have same initial seed and same bound.
+ */
+ public static ApiExpression randInteger(Object seed, Object bound) {
+ return apiCall(BuiltInFunctionDefinitions.RAND_INTEGER, objectToExpression(seed), objectToExpression(bound));
+ }
+
+ /**
+ * Returns the string that results from concatenating the arguments.
+ * Returns NULL if any argument is NULL.
+ */
+ public static ApiExpression concat(Object string, Object... strings) {
+ return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.CONCAT, string, strings);
+ }
+
+ /**
+ * Calculates the arc tangent of a given coordinate.
+ */
+ public static ApiExpression atan2(Object y, Object x) {
+ return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ATAN2, y, x);
+ }
+
+ /**
+ * Returns negative numeric.
+ */
+ public static ApiExpression negative(Object v) {
+ return apiCall(BuiltInFunctionDefinitions.MINUS_PREFIX, v);
+ }
+
+ /**
+ * Returns the string that results from concatenating the arguments and separator.
+ * Returns NULL If the separator is NULL.
+ *
+ * <p>Note: this function does not skip empty strings. However, it does skip any NULL
+ * values after the separator argument.
+ */
+ public static ApiExpression concatWs(Object separator, Object string, Object... strings) {
+ return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.CONCAT_WS, separator, string, strings);
+ }
+
+ /**
+ * Returns an UUID (Universally Unique Identifier) string (e.g.,
+ * "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly
+ * generated) UUID. The UUID is generated using a cryptographically strong pseudo random number
+ * generator.
+ */
+ public static ApiExpression uuid() {
+ return apiCall(BuiltInFunctionDefinitions.UUID);
+ }
+
+ /**
+ * Returns a null literal value of a given data type.
+ *
+ * <p>e.g. {@code nullOf(DataTypes.INT())}
+ */
+ public static ApiExpression nullOf(DataType dataType) {
+ return new ApiExpression(valueLiteral(null, dataType));
+ }
+
+ /**
+ * @deprecated This method will be removed in future versions as it uses the old type system.
+ * It is recommended to use {@link #nullOf(DataType)} instead which uses the new type
+ * system based on {@link DataTypes}. Please make sure to use either the old or the new
+ * type system consistently to avoid unintended behavior. See the website
+ * documentation for more information.
+ */
+ public static ApiExpression nullOf(TypeInformation<?> typeInfo) {
+ return nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo));
+ }
+
+ /**
+ * Calculates the logarithm of the given value.
+ */
+ public static ApiExpression log(Object value) {
+ return apiCall(BuiltInFunctionDefinitions.LOG, value);
+ }
+
+ /**
+ * Calculates the logarithm of the given value to the given base.
+ */
+ public static ApiExpression log(Object base, Object value) {
+ return apiCall(BuiltInFunctionDefinitions.LOG, base, value);
+ }
+
+ /**
+ * Ternary conditional operator that decides which of two other expressions should be evaluated
+ * based on a evaluated boolean condition.
+ *
+ * <p>e.g. ifThenElse($("f0") > 5, "A", "B") leads to "A"
+ *
+ * @param condition boolean condition
+ * @param ifTrue expression to be evaluated if condition holds
+ * @param ifFalse expression to be evaluated if condition does not hold
+ */
+ public static ApiExpression ifThenElse(Object condition, Object ifTrue, Object ifFalse) {
+ return apiCall(BuiltInFunctionDefinitions.IF, condition, ifTrue, ifFalse);
+ }
+
+ /**
+ * Creates an expression that selects a range of columns. It can be used wherever an array of
+ * expression is accepted such as function calls, projections, or groupings.
+ *
+ * <p>A range can either be index-based or name-based. Indices start at 1 and boundaries are
+ * inclusive.
+ *
+ * <p>e.g. withColumns(range("b", "c")) or withoutColumns($("*"))
+ */
+ public static ApiExpression withColumns(Object head, Object... tail) {
+ return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITH_COLUMNS, head, tail);
+ }
+
+ /**
+ * Creates an expression that selects all columns except for the given range of columns. It can
+ * be used wherever an array of expression is accepted such as function calls, projections, or
+ * groupings.
+ *
+ * <p>A range can either be index-based or name-based. Indices start at 1 and boundaries are
+ * inclusive.
+ *
+ * <p>e.g. withoutColumns(range("b", "c")) or withoutColumns($("c"))
+ */
+ public static ApiExpression withoutColumns(Object head, Object... tail) {
+ return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITHOUT_COLUMNS, head, tail);
+ }
+
+ /**
+ * A call to a function that will be looked up in a catalog. There are two kinds of functions:
+ * <ul>
+ * <li>System functions - which are identified with one part names</li>
+ * <li>Catalog functions - which are identified always with three parts names
+ * (catalog, database, function)</li>
+ * </ul>
+ *
+ * <p>Moreover each function can either be a temporary function or permanent one
+ * (which is stored in an external catalog).
+ *
+ * <p>Based on that two properties the resolution order for looking up a function based on
+ * the provided {@code functionName} is following:
+ * <ul>
+ * <li>Temporary system function</li>
+ * <li>System function</li>
+ * <li>Temporary catalog function</li>
+ * <li>Catalog function</li>
+ * </ul>
+ *
+ * @see TableEnvironment#useCatalog(String)
+ * @see TableEnvironment#useDatabase(String)
+ * @see TableEnvironment#createTemporaryFunction
+ * @see TableEnvironment#createTemporarySystemFunction
+ */
+ public static ApiExpression call(String path, Object... params) {
+ return new ApiExpression(ApiExpressionUtils.lookupCall(
+ path,
+ Arrays.stream(params).map(ApiExpressionUtils::objectToExpression).toArray(Expression[]::new)));
+ }
+
+ private static ApiExpression apiCall(FunctionDefinition functionDefinition, Object... args) {
+ List<Expression> arguments =
+ Stream.of(args)
+ .map(ApiExpressionUtils::objectToExpression)
+ .collect(Collectors.toList());
+ return new ApiExpression(unresolvedCall(functionDefinition, arguments));
+ }
+
+ private static ApiExpression apiCallAtLeastOneArgument(FunctionDefinition functionDefinition,
+ Object arg0,
+ Object... args) {
+ List<Expression> arguments = Stream.concat(
+ Stream.of(arg0),
+ Stream.of(args)
+ ).map(ApiExpressionUtils::objectToExpression)
+ .collect(Collectors.toList());
+ return new ApiExpression(unresolvedCall(functionDefinition, arguments));
+ }
+
+ private static ApiExpression apiCallAtLeastTwoArgument(
+ FunctionDefinition functionDefinition,
+ Object arg0,
+ Object arg1,
+ Object... args) {
+ List<Expression> arguments = Stream.concat(
+ Stream.of(arg0, arg1),
+ Stream.of(args)
+ ).map(ApiExpressionUtils::objectToExpression)
+ .collect(Collectors.toList());
+ return new ApiExpression(unresolvedCall(functionDefinition, arguments));
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
index 4601e75..36cd855 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
/**
@@ -41,8 +42,8 @@ public abstract class GroupWindow {
private final Expression timeField;
GroupWindow(Expression alias, Expression timeField) {
- this.alias = alias;
- this.timeField = timeField;
+ this.alias = ApiExpressionUtils.unwrapFromApi(alias);
+ this.timeField = ApiExpressionUtils.unwrapFromApi(timeField);
}
public Expression getAlias() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
index 6e646f7..0d9ee6d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
@@ -36,7 +37,7 @@ public final class SessionWithGap {
private final Expression gap;
SessionWithGap(Expression gap) {
- this.gap = gap;
+ this.gap = ApiExpressionUtils.unwrapFromApi(gap);
}
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
index 53c9c4f..ef63dde 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
@@ -32,8 +33,8 @@ public final class SessionWithGapOnTime {
private final Expression gap;
SessionWithGapOnTime(Expression timeField, Expression gap) {
- this.timeField = timeField;
- this.gap = gap;
+ this.timeField = ApiExpressionUtils.unwrapFromApi(timeField);
+ this.gap = ApiExpressionUtils.unwrapFromApi(gap);
}
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
index 891ea7c..b88a93e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
/**
@@ -31,7 +32,7 @@ public final class SessionWithGapOnTimeWithAlias extends GroupWindow {
SessionWithGapOnTimeWithAlias(Expression alias, Expression timeField, Expression gap) {
super(alias, timeField);
- this.gap = gap;
+ this.gap = ApiExpressionUtils.unwrapFromApi(gap);
}
public Expression getGap() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
index 44470f9..983d07d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
@@ -32,7 +33,7 @@ public final class SlideWithSize {
private final Expression size;
SlideWithSize(Expression size) {
- this.size = size;
+ this.size = ApiExpressionUtils.unwrapFromApi(size);
}
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
index 4d509d1..d4d83c1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
@@ -37,8 +38,8 @@ public final class SlideWithSizeAndSlide {
private final Expression slide;
SlideWithSizeAndSlide(Expression size, Expression slide) {
- this.size = size;
- this.slide = slide;
+ this.size = ApiExpressionUtils.unwrapFromApi(size);
+ this.slide = ApiExpressionUtils.unwrapFromApi(slide);
}
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
index 604b9cd..c74529a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
@@ -33,12 +34,12 @@ public final class SlideWithSizeAndSlideOnTime {
private final Expression slide;
SlideWithSizeAndSlideOnTime(
- Expression timeField,
- Expression size,
- Expression slide) {
- this.timeField = timeField;
- this.size = size;
- this.slide = slide;
+ Expression timeField,
+ Expression size,
+ Expression slide) {
+ this.timeField = ApiExpressionUtils.unwrapFromApi(timeField);
+ this.size = ApiExpressionUtils.unwrapFromApi(size);
+ this.slide = ApiExpressionUtils.unwrapFromApi(slide);
}
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
index 50b3692..8b9c692 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
/**
@@ -31,13 +32,13 @@ public final class SlideWithSizeAndSlideOnTimeWithAlias extends GroupWindow {
private final Expression slide;
SlideWithSizeAndSlideOnTimeWithAlias(
- Expression alias,
- Expression timeField,
- Expression size,
- Expression slide) {
+ Expression alias,
+ Expression timeField,
+ Expression size,
+ Expression slide) {
super(alias, timeField);
- this.size = size;
- this.slide = slide;
+ this.size = ApiExpressionUtils.unwrapFromApi(size);
+ this.slide = ApiExpressionUtils.unwrapFromApi(slide);
}
public Expression getSize() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
index 600e3a1..bbc1825 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
@@ -36,7 +37,7 @@ public final class TumbleWithSize {
private Expression size;
TumbleWithSize(Expression size) {
- this.size = size;
+ this.size = ApiExpressionUtils.unwrapFromApi(size);
}
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
index f635ba5..07616d1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
@@ -32,8 +33,8 @@ public final class TumbleWithSizeOnTime {
private final Expression size;
TumbleWithSizeOnTime(Expression time, Expression size) {
- this.time = time;
- this.size = size;
+ this.time = ApiExpressionUtils.unwrapFromApi(time);
+ this.size = ApiExpressionUtils.unwrapFromApi(size);
}
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
index 5180d33..4e25a4f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
/**
@@ -31,7 +32,7 @@ public final class TumbleWithSizeOnTimeWithAlias extends GroupWindow {
TumbleWithSizeOnTimeWithAlias(Expression alias, Expression timeField, Expression size) {
super(alias, timeField);
- this.size = size;
+ this.size = ApiExpressionUtils.unwrapFromApi(size);
}
public Expression getSize() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
new file mode 100644
index 0000000..41cb93c
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -0,0 +1,1284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_DAY;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_HOUR;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_MINUTE;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_SECOND;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.tableRef;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.toMilliInterval;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.toMonthInterval;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
+//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.*;
+//CHECKSTYLE.ON: AvoidStarImport|ImportOrder
+
+/**
+ * These are Java and Scala common operations that can be used to construct an {@link Expression} AST for
+ * expression operations.
+ *
+ * @param <InType> The accepted type of input expressions, it is {@code Expression} for Scala and
+ * {@code Object} for Java. Generally the expression DSL works on expressions, the
+ * reason why Java accepts Object is to remove cumbersome call to {@code lit()} for
+ * literals. Scala alleviates this problem via implicit conversions.
+ * @param <OutType> The produced type of the DSL. It is {@code ApiExpression} for Java and {@code Expression}
+ * for Scala. In Scala the infix operations are included via implicit conversions. In Java
+ * we introduced a wrapper that enables the operations without pulling them through the whole stack.
+ */
+@PublicEvolving
+public abstract class BaseExpressions<InType, OutType> {
+ protected abstract Expression toExpr();
+
+ protected abstract OutType toApiSpecificExpression(Expression expression);
+
+ /**
+ * Specifies a name for an expression i.e. a field.
+ *
+ * @param name name for one field
+ * @param extraNames additional names if the expression expands to multiple fields
+ */
+ public OutType as(String name, String... extraNames) {
+ return toApiSpecificExpression(ApiExpressionUtils.unresolvedCall(
+ BuiltInFunctionDefinitions.AS,
+ Stream.concat(
+ Stream.of(toExpr(), ApiExpressionUtils.valueLiteral(name)),
+ Stream.of(extraNames).map(ApiExpressionUtils::valueLiteral)
+ ).toArray(Expression[]::new)));
+ }
+
+ /**
+ * Boolean AND in three-valued logic. This is an infix notation. See also
+ * {@link Expressions#and(Object, Object, Object...)} for prefix notation with multiple arguments.
+ *
+ * @see Expressions#and(Object, Object, Object...)
+ */
+ public OutType and(InType other) {
+ return toApiSpecificExpression(unresolvedCall(AND, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Boolean OR in three-valued logic. This is an infix notation. See also
+ * {@link Expressions#or(Object, Object, Object...)} for prefix notation with multiple arguments.
+ *
+ * @see Expressions#or(Object, Object, Object...)
+ */
+ public OutType or(InType other) {
+ return toApiSpecificExpression(unresolvedCall(OR, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Greater than.
+ */
+ public OutType isGreater(InType other) {
+ return toApiSpecificExpression(unresolvedCall(GREATER_THAN, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Greater than or equal.
+ */
+ public OutType isGreaterOrEqual(InType other) {
+ return toApiSpecificExpression(unresolvedCall(GREATER_THAN_OR_EQUAL, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Less than.
+ */
+ public OutType isLess(InType other) {
+ return toApiSpecificExpression(unresolvedCall(LESS_THAN, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Less than or equal.
+ */
+ public OutType isLessOrEqual(InType other) {
+ return toApiSpecificExpression(unresolvedCall(LESS_THAN_OR_EQUAL, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Equals.
+ */
+ public OutType isEqual(InType other) {
+ return toApiSpecificExpression(unresolvedCall(EQUALS, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Not equal.
+ */
+ public OutType isNotEqual(InType other) {
+ return toApiSpecificExpression(unresolvedCall(NOT_EQUALS, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Returns left plus right.
+ */
+ public OutType plus(InType other) {
+ return toApiSpecificExpression(unresolvedCall(PLUS, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Returns left minus right.
+ */
+ public OutType minus(InType other) {
+ return toApiSpecificExpression(unresolvedCall(MINUS, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Returns left divided by right.
+ */
+ public OutType dividedBy(InType other) {
+ return toApiSpecificExpression(unresolvedCall(DIVIDE, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Returns left multiplied by right.
+ */
+ public OutType times(InType other) {
+ return toApiSpecificExpression(unresolvedCall(TIMES, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Returns true if the given expression is between lowerBound and upperBound (both inclusive).
+ * False otherwise. The parameters must be numeric types or identical comparable types.
+ *
+ * @param lowerBound numeric or comparable expression
+ * @param upperBound numeric or comparable expression
+ */
+ public OutType between(InType lowerBound, InType upperBound) {
+ return toApiSpecificExpression(unresolvedCall(
+ BETWEEN,
+ toExpr(),
+ objectToExpression(lowerBound),
+ objectToExpression(upperBound)));
+ }
+
+ /**
+ * Returns true if the given expression is not between lowerBound and upperBound (both
+ * inclusive). False otherwise. The parameters must be numeric types or identical
+ * comparable types.
+ *
+ * @param lowerBound numeric or comparable expression
+ * @param upperBound numeric or comparable expression
+ */
+ public OutType notBetween(InType lowerBound, InType upperBound) {
+ return toApiSpecificExpression(unresolvedCall(
+ NOT_BETWEEN,
+ toExpr(),
+ objectToExpression(lowerBound),
+ objectToExpression(upperBound)));
+ }
+
+ /**
+ * Ternary conditional operator that decides which of two other expressions should be evaluated
+ * based on a evaluated boolean condition.
+ *
+ * <p>e.g. lit(42).isGreater(5).then("A", "B") leads to "A"
+ *
+ * @param ifTrue expression to be evaluated if condition holds
+ * @param ifFalse expression to be evaluated if condition does not hold
+ */
+ public OutType then(InType ifTrue, InType ifFalse) {
+ return toApiSpecificExpression(unresolvedCall(
+ IF,
+ toExpr(),
+ objectToExpression(ifTrue),
+ objectToExpression(ifFalse)));
+ }
+
+ /**
+ * Returns true if the given expression is null.
+ */
+ public OutType isNull() {
+ return toApiSpecificExpression(unresolvedCall(IS_NULL, toExpr()));
+ }
+
+ /**
+ * Returns true if the given expression is not null.
+ */
+ public OutType isNotNull() {
+ return toApiSpecificExpression(unresolvedCall(IS_NOT_NULL, toExpr()));
+ }
+
+ /**
+ * Returns true if given boolean expression is true. False otherwise (for null and false).
+ */
+ public OutType isTrue() {
+ return toApiSpecificExpression(unresolvedCall(IS_TRUE, toExpr()));
+ }
+
+ /**
+ * Returns true if given boolean expression is false. False otherwise (for null and true).
+ */
+ public OutType isFalse() {
+ return toApiSpecificExpression(unresolvedCall(IS_FALSE, toExpr()));
+ }
+
+ /**
+ * Returns true if given boolean expression is not true (for null and false). False otherwise.
+ */
+ public OutType isNotTrue() {
+ return toApiSpecificExpression(unresolvedCall(IS_NOT_TRUE, toExpr()));
+ }
+
+ /**
+ * Returns true if given boolean expression is not false (for null and true). False otherwise.
+ */
+ public OutType isNotFalse() {
+ return toApiSpecificExpression(unresolvedCall(IS_NOT_FALSE, toExpr()));
+ }
+
+ /**
+ * Similar to a SQL distinct aggregation clause such as COUNT(DISTINCT a), declares that an
+ * aggregation function is only applied on distinct input values.
+ *
+ * <p>For example:
+ * <pre>
+ * {@code
+ * orders
+ * .groupBy($("a"))
+ * .select($("a"), $("b").sum().distinct().as("d"))
+ * }
+ * </pre>
+ */
+ public OutType distinct() {
+ return toApiSpecificExpression(unresolvedCall(DISTINCT, toExpr()));
+ }
+
+ /**
+ * Returns the sum of the numeric field across all input values.
+ * If all values are null, null is returned.
+ */
+ public OutType sum() {
+ return toApiSpecificExpression(unresolvedCall(SUM, toExpr()));
+ }
+
+ /**
+ * Returns the sum of the numeric field across all input values.
+ * If all values are null, 0 is returned.
+ */
+ public OutType sum0() {
+ return toApiSpecificExpression(unresolvedCall(SUM0, toExpr()));
+ }
+
+ /**
+ * Returns the minimum value of field across all input values.
+ */
+ public OutType min() {
+ return toApiSpecificExpression(unresolvedCall(MIN, toExpr()));
+ }
+
+ /**
+ * Returns the maximum value of field across all input values.
+ */
+ public OutType max() {
+ return toApiSpecificExpression(unresolvedCall(MAX, toExpr()));
+ }
+
+ /**
+ * Returns the number of input rows for which the field is not null.
+ */
+ public OutType count() {
+ return toApiSpecificExpression(unresolvedCall(COUNT, toExpr()));
+ }
+
+ /**
+ * Returns the average (arithmetic mean) of the numeric field across all input values.
+ */
+ public OutType avg() {
+ return toApiSpecificExpression(unresolvedCall(AVG, toExpr()));
+ }
+
+ /**
+ * Returns the population standard deviation of an expression (the square root of varPop()).
+ */
+ public OutType stddevPop() {
+ return toApiSpecificExpression(unresolvedCall(STDDEV_POP, toExpr()));
+ }
+
+ /**
+ * Returns the sample standard deviation of an expression (the square root of varSamp()).
+ */
+ public OutType stddevSamp() {
+ return toApiSpecificExpression(unresolvedCall(STDDEV_SAMP, toExpr()));
+ }
+
+ /**
+ * Returns the population standard variance of an expression.
+ */
+ public OutType varPop() {
+ return toApiSpecificExpression(unresolvedCall(VAR_POP, toExpr()));
+ }
+
+ /**
+ * Returns the sample variance of a given expression.
+ */
+ public OutType varSamp() {
+ return toApiSpecificExpression(unresolvedCall(VAR_SAMP, toExpr()));
+ }
+
+ /**
+ * Returns multiset aggregate of a given expression.
+ */
+ public OutType collect() {
+ return toApiSpecificExpression(unresolvedCall(COLLECT, toExpr()));
+ }
+
+ /**
+ * Converts a value to a given data type.
+ *
+ * <p>e.g. "42".cast(DataTypes.INT()) leads to 42.
+ */
+ public OutType cast(DataType toType) {
+ return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), typeLiteral(toType)));
+ }
+
+ /**
+ * @deprecated This method will be removed in future versions as it uses the old type system. It
+ * is recommended to use {@link #cast(DataType)} instead which uses the new type system
+ * based on {@link org.apache.flink.table.api.DataTypes}. Please make sure to use either the old
+ * or the new type system consistently to avoid unintended behavior. See the website documentation
+ * for more information.
+ */
+ @Deprecated
+ public OutType cast(TypeInformation<?> toType) {
+ return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), typeLiteral(fromLegacyInfoToDataType(toType))));
+ }
+
+ /**
+ * Specifies ascending order of an expression i.e. a field for orderBy unresolvedCall.
+ */
+ public OutType asc() {
+ return toApiSpecificExpression(unresolvedCall(ORDER_ASC, toExpr()));
+ }
+
+ /**
+ * Specifies descending order of an expression i.e. a field for orderBy unresolvedCall.
+ */
+ public OutType desc() {
+ return toApiSpecificExpression(unresolvedCall(ORDER_DESC, toExpr()));
+ }
+
+ /**
+ * Returns true if an expression exists in a given list of expressions. This is a shorthand
+ * for multiple OR conditions.
+ *
+ * <p>If the testing set contains null, the result will be null if the element can not be found
+ * and true if it can be found. If the element is null, the result is always null.
+ *
+ * <p>e.g. lit("42").in(1, 2, 3) leads to false.
+ */
+ @SafeVarargs
+ public final OutType in(InType... elements) {
+ Expression[] args = Stream.concat(
+ Stream.of(toExpr()),
+ Arrays.stream(elements).map(ApiExpressionUtils::objectToExpression))
+ .toArray(Expression[]::new);
+ return toApiSpecificExpression(unresolvedCall(IN, args));
+ }
+
+ /**
+ * Returns true if an expression exists in a given table sub-query. The sub-query table
+ * must consist of one column. This column must have the same data type as the expression.
+ *
+ * <p>Note: This operation is not supported in a streaming environment yet.
+ */
+ public OutType in(Table table) {
+ return toApiSpecificExpression(unresolvedCall(IN, toExpr(), tableRef(table.toString(), table)));
+ }
+
+ /**
+ * Returns the start time (inclusive) of a window when applied on a window reference.
+ */
+ public OutType start() {
+ return toApiSpecificExpression(unresolvedCall(WINDOW_START, toExpr()));
+ }
+
+ /**
+ * Returns the end time (exclusive) of a window when applied on a window reference.
+ *
+ * <p>e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000.
+ */
+ public OutType end() {
+ return toApiSpecificExpression(unresolvedCall(WINDOW_END, toExpr()));
+ }
+
+ /**
+ * Calculates the remainder of division the given number by another one.
+ */
+ public OutType mod(InType other) {
+ return toApiSpecificExpression(unresolvedCall(MOD, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Calculates the Euler's number raised to the given power.
+ */
+ public OutType exp() {
+ return toApiSpecificExpression(unresolvedCall(EXP, toExpr()));
+ }
+
+ /**
+ * Calculates the base 10 logarithm of the given value.
+ */
+ public OutType log10() {
+ return toApiSpecificExpression(unresolvedCall(LOG10, toExpr()));
+ }
+
+ /**
+ * Calculates the base 2 logarithm of the given value.
+ */
+ public OutType log2() {
+ return toApiSpecificExpression(unresolvedCall(LOG2, toExpr()));
+ }
+
+ /**
+ * Calculates the natural logarithm of the given value.
+ */
+ public OutType ln() {
+ return toApiSpecificExpression(unresolvedCall(LN, toExpr()));
+ }
+
+ /**
+ * Calculates the natural logarithm of the given value.
+ */
+ public OutType log() {
+ return toApiSpecificExpression(unresolvedCall(LOG, toExpr()));
+ }
+
+ /**
+ * Calculates the logarithm of the given value to the given base.
+ */
+ public OutType log(InType base) {
+ return toApiSpecificExpression(unresolvedCall(LOG, objectToExpression(base), toExpr()));
+ }
+
+ /**
+ * Calculates the given number raised to the power of the other value.
+ */
+ public OutType power(InType other) {
+ return toApiSpecificExpression(unresolvedCall(POWER, toExpr(), objectToExpression(other)));
+ }
+
+ /**
+ * Calculates the hyperbolic cosine of a given value.
+ */
+ public OutType cosh() {
+ return toApiSpecificExpression(unresolvedCall(COSH, toExpr()));
+ }
+
+ /**
+ * Calculates the square root of a given value.
+ */
+ public OutType sqrt() {
+ return toApiSpecificExpression(unresolvedCall(SQRT, toExpr()));
+ }
+
+ /**
+ * Calculates the absolute value of given value.
+ */
+ public OutType abs() {
+ return toApiSpecificExpression(unresolvedCall(ABS, toExpr()));
+ }
+
+ /**
+ * Calculates the largest integer less than or equal to a given number.
+ */
+ public OutType floor() {
+ return toApiSpecificExpression(unresolvedCall(FLOOR, toExpr()));
+ }
+
+ /**
+ * Calculates the hyperbolic sine of a given value.
+ */
+ public OutType sinh() {
+ return toApiSpecificExpression(unresolvedCall(SINH, toExpr()));
+ }
+
+ /**
+ * Calculates the smallest integer greater than or equal to a given number.
+ */
+ public OutType ceil() {
+ return toApiSpecificExpression(unresolvedCall(CEIL, toExpr()));
+ }
+
+ /**
+ * Calculates the sine of a given number.
+ */
+ public OutType sin() {
+ return toApiSpecificExpression(unresolvedCall(SIN, toExpr()));
+ }
+
+ /**
+ * Calculates the cosine of a given number.
+ */
+ public OutType cos() {
+ return toApiSpecificExpression(unresolvedCall(COS, toExpr()));
+ }
+
+ /**
+ * Calculates the tangent of a given number.
+ */
+ public OutType tan() {
+ return toApiSpecificExpression(unresolvedCall(TAN, toExpr()));
+ }
+
+ /**
+ * Calculates the cotangent of a given number.
+ */
+ public OutType cot() {
+ return toApiSpecificExpression(unresolvedCall(COT, toExpr()));
+ }
+
+ /**
+ * Calculates the arc sine of a given number.
+ */
+ public OutType asin() {
+ return toApiSpecificExpression(unresolvedCall(ASIN, toExpr()));
+ }
+
+ /**
+ * Calculates the arc cosine of a given number.
+ */
+ public OutType acos() {
+ return toApiSpecificExpression(unresolvedCall(ACOS, toExpr()));
+ }
+
+ /**
+ * Calculates the arc tangent of a given number.
+ */
+ public OutType atan() {
+ return toApiSpecificExpression(unresolvedCall(ATAN, toExpr()));
+ }
+
+ /**
+ * Calculates the hyperbolic tangent of a given number.
+ */
+ public OutType tanh() {
+ return toApiSpecificExpression(unresolvedCall(TANH, toExpr()));
+ }
+
+ /**
+ * Converts numeric from radians to degrees.
+ */
+ public OutType degrees() {
+ return toApiSpecificExpression(unresolvedCall(DEGREES, toExpr()));
+ }
+
+ /**
+ * Converts numeric from degrees to radians.
+ */
+ public OutType radians() {
+ return toApiSpecificExpression(unresolvedCall(RADIANS, toExpr()));
+ }
+
+ /**
+ * Calculates the signum of a given number.
+ */
+ public OutType sign() {
+ return toApiSpecificExpression(unresolvedCall(SIGN, toExpr()));
+ }
+
+ /**
+ * Rounds the given number to integer places right to the decimal point.
+ */
+ public OutType round(InType places) {
+ return toApiSpecificExpression(unresolvedCall(ROUND, toExpr(), objectToExpression(places)));
+ }
+
+ /**
+ * Returns a string representation of an integer numeric value in binary format. Returns null if
+ * numeric is null. E.g. "4" leads to "100", "12" leads to "1100".
+ */
+ public OutType bin() {
+ return toApiSpecificExpression(unresolvedCall(BIN, toExpr()));
+ }
+
+ /**
+ * Returns a string representation of an integer numeric value or a string in hex format. Returns
+ * null if numeric or string is null.
+ *
+ * <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a string "hello,world" leads
+ * to "68656c6c6f2c776f726c64".
+ */
+ public OutType hex() {
+ return toApiSpecificExpression(unresolvedCall(HEX, toExpr()));
+ }
+
+ /**
+ * Returns a number of truncated to n decimal places.
+ * If n is 0,the result has no decimal point or fractional part.
+ * n can be negative to cause n digits left of the decimal point of the value to become zero.
+ * E.g. truncate(42.345, 2) to 42.34.
+ */
+ public OutType truncate(InType n) {
+ return toApiSpecificExpression(unresolvedCall(TRUNCATE, toExpr(), objectToExpression(n)));
+ }
+
+ /**
+ * Returns a number of truncated to 0 decimal places.
+ * E.g. truncate(42.345) to 42.0.
+ */
+ public OutType truncate() {
+ return toApiSpecificExpression(unresolvedCall(TRUNCATE, toExpr()));
+ }
+
+ // String operations
+
+ /**
+ * Creates a substring of the given string at given index for a given length.
+ *
+ * @param beginIndex first character of the substring (starting at 1, inclusive)
+ * @param length number of characters of the substring
+ */
+ public OutType substring(InType beginIndex, InType length) {
+ return toApiSpecificExpression(unresolvedCall(SUBSTRING, toExpr(), objectToExpression(beginIndex), objectToExpression(length)));
+ }
+
+ /**
+ * Creates a substring of the given string beginning at the given index to the end.
+ *
+ * @param beginIndex first character of the substring (starting at 1, inclusive)
+ */
+ public OutType substring(InType beginIndex) {
+ return toApiSpecificExpression(unresolvedCall(SUBSTRING, toExpr(), objectToExpression(beginIndex)));
+ }
+
+ /**
+ * Removes leading space characters from the given string.
+ */
+ public OutType trimLeading() {
+ return toApiSpecificExpression(unresolvedCall(
+ TRIM,
+ valueLiteral(true),
+ valueLiteral(false),
+ valueLiteral(" "),
+ toExpr()));
+ }
+
+ /**
+ * Removes leading characters from the given string.
+ *
+ * @param character string containing the character
+ */
+ public OutType trimLeading(InType character) {
+ return toApiSpecificExpression(unresolvedCall(
+ TRIM,
+ valueLiteral(true),
+ valueLiteral(false),
+ objectToExpression(character),
+ toExpr()));
+ }
+
+ /**
+ * Removes trailing space characters from the given string.
+ */
+ public OutType trimTrailing() {
+ return toApiSpecificExpression(unresolvedCall(
+ TRIM,
+ valueLiteral(false),
+ valueLiteral(true),
+ valueLiteral(" "),
+ toExpr()));
+ }
+
+ /**
+ * Removes trailing characters from the given string.
+ *
+ * @param character string containing the character
+ */
+ public OutType trimTrailing(InType character) {
+ return toApiSpecificExpression(unresolvedCall(
+ TRIM,
+ valueLiteral(false),
+ valueLiteral(true),
+ objectToExpression(character),
+ toExpr()));
+ }
+
+ /**
+ * Removes leading and trailing space characters from the given string.
+ */
+ public OutType trim() {
+ return toApiSpecificExpression(unresolvedCall(
+ TRIM,
+ valueLiteral(true),
+ valueLiteral(true),
+ valueLiteral(" "),
+ toExpr()));
+ }
+
+ /**
+ * Removes leading and trailing characters from the given string.
+ *
+ * @param character string containing the character
+ */
+ public OutType trim(InType character) {
+ return toApiSpecificExpression(unresolvedCall(
+ TRIM,
+ valueLiteral(true),
+ valueLiteral(true),
+ objectToExpression(character),
+ toExpr()));
+ }
+
+ /**
+ * Returns a new string which replaces all the occurrences of the search target
+ * with the replacement string (non-overlapping).
+ */
+ public OutType replace(InType search, InType replacement) {
+ return toApiSpecificExpression(unresolvedCall(REPLACE, toExpr(), objectToExpression(search), objectToExpression(replacement)));
+ }
+
+ /**
+ * Returns the length of a string.
+ */
+ public OutType charLength() {
+ return toApiSpecificExpression(unresolvedCall(CHAR_LENGTH, toExpr()));
+ }
+
+ /**
+ * Returns all of the characters in a string in upper case using the rules of
+ * the default locale.
+ */
+ public OutType upperCase() {
+ return toApiSpecificExpression(unresolvedCall(UPPER, toExpr()));
+ }
+
+ /**
+ * Returns all of the characters in a string in lower case using the rules of
+ * the default locale.
+ */
+ public OutType lowerCase() {
+ return toApiSpecificExpression(unresolvedCall(LOWER, toExpr()));
+ }
+
+ /**
+ * Converts the initial letter of each word in a string to uppercase.
+ * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.
+ */
+ public OutType initCap() {
+ return toApiSpecificExpression(unresolvedCall(INIT_CAP, toExpr()));
+ }
+
+ /**
+ * Returns true, if a string matches the specified LIKE pattern.
+ *
+ * <p>e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n"
+ */
+ public OutType like(InType pattern) {
+ return toApiSpecificExpression(unresolvedCall(LIKE, toExpr(), objectToExpression(pattern)));
+ }
+
+ /**
+ * Returns true, if a string matches the specified SQL regex pattern.
+ *
+ * <p>e.g. "A+" matches all strings that consist of at least one A
+ */
+ public OutType similar(InType pattern) {
+ return toApiSpecificExpression(unresolvedCall(SIMILAR, toExpr(), objectToExpression(pattern)));
+ }
+
+ /**
+ * Returns the position of string in an other string starting at 1.
+ * Returns 0 if string could not be found.
+ *
+ * <p>e.g. lit("a").position("bbbbba") leads to 6
+ */
+ public OutType position(InType haystack) {
+ return toApiSpecificExpression(unresolvedCall(POSITION, toExpr(), objectToExpression(haystack)));
+ }
+
+ /**
+ * Returns a string left-padded with the given pad string to a length of len characters. If
+ * the string is longer than len, the return value is shortened to len characters.
+ *
+ * <p>e.g. lit("hi").lpad(4, "??") returns "??hi", lit("hi").lpad(1, '??') returns "h"
+ */
+ public OutType lpad(InType len, InType pad) {
+ return toApiSpecificExpression(unresolvedCall(LPAD, toExpr(), objectToExpression(len), objectToExpression(pad)));
+ }
+
+ /**
+ * Returns a string right-padded with the given pad string to a length of len characters. If
+ * the string is longer than len, the return value is shortened to len characters.
+ *
+ * <p>e.g. lit("hi").rpad(4, "??") returns "hi??", lit("hi").rpad(1, '??') returns "h"
+ */
+ public OutType rpad(InType len, InType pad) {
+ return toApiSpecificExpression(unresolvedCall(RPAD, toExpr(), objectToExpression(len), objectToExpression(pad)));
+ }
+
+ /**
+ * Defines an aggregation to be used for a previously specified over window.
+ *
+ * <p>For example:
+ *
+ * <pre>
+ * {@code
+ * table
+ * .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
+ * .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
+ * }</pre>
+ */
+ public OutType over(InType alias) {
+ return toApiSpecificExpression(unresolvedCall(OVER, toExpr(), objectToExpression(alias)));
+ }
+
+ /**
+ * Replaces a substring of string with a string starting at a position (starting at 1).
+ *
+ * <p>e.g. lit("xxxxxtest").overlay("xxxx", 6) leads to "xxxxxxxxx"
+ */
+ public OutType overlay(InType newString, InType starting) {
+ return toApiSpecificExpression(unresolvedCall(
+ OVERLAY,
+ toExpr(),
+ objectToExpression(newString),
+ objectToExpression(starting)));
+ }
+
+ /**
+ * Replaces a substring of string with a string starting at a position (starting at 1).
+ * The length specifies how many characters should be removed.
+ *
+ * <p>e.g. lit("xxxxxtest").overlay("xxxx", 6, 2) leads to "xxxxxxxxxst"
+ */
+ public OutType overlay(InType newString, InType starting, InType length) {
+ return toApiSpecificExpression(unresolvedCall(
+ OVERLAY,
+ toExpr(),
+ objectToExpression(newString),
+ objectToExpression(starting),
+ objectToExpression(length)));
+ }
+
+ /**
+ * Returns a string with all substrings that match the regular expression consecutively
+ * being replaced.
+ */
+ public OutType regexpReplace(InType regex, InType replacement) {
+ return toApiSpecificExpression(unresolvedCall(
+ REGEXP_REPLACE,
+ toExpr(),
+ objectToExpression(regex),
+ objectToExpression(replacement)));
+ }
+
+ /**
+ * Returns a string extracted with a specified regular expression and a regex match group
+ * index.
+ */
+ public OutType regexpExtract(InType regex, InType extractIndex) {
+ return toApiSpecificExpression(unresolvedCall(
+ REGEXP_EXTRACT,
+ toExpr(),
+ objectToExpression(regex),
+ objectToExpression(extractIndex)));
+ }
+
+ /**
+ * Returns a string extracted with a specified regular expression.
+ */
+ public OutType regexpExtract(InType regex) {
+ return toApiSpecificExpression(unresolvedCall(REGEXP_EXTRACT, toExpr(), objectToExpression(regex)));
+ }
+
+ /**
+ * Returns the base string decoded with base64.
+ */
+ public OutType fromBase64() {
+ return toApiSpecificExpression(unresolvedCall(FROM_BASE64, toExpr()));
+ }
+
+ /**
+ * Returns the base64-encoded result of the input string.
+ */
+ public OutType toBase64() {
+ return toApiSpecificExpression(unresolvedCall(TO_BASE64, toExpr()));
+ }
+
+ /**
+ * Returns a string that removes the left whitespaces from the given string.
+ */
+ public OutType ltrim() {
+ return toApiSpecificExpression(unresolvedCall(LTRIM, toExpr()));
+ }
+
+ /**
+ * Returns a string that removes the right whitespaces from the given string.
+ */
+ public OutType rtrim() {
+ return toApiSpecificExpression(unresolvedCall(RTRIM, toExpr()));
+ }
+
+ /**
+ * Returns a string that repeats the base string n times.
+ */
+ public OutType repeat(InType n) {
+ return toApiSpecificExpression(unresolvedCall(REPEAT, toExpr(), objectToExpression(n)));
+ }
+
+ // Temporal operations
+
+ /**
+ * Parses a date string in the form "yyyy-MM-dd" to a SQL Date.
+ */
+ public OutType toDate() {
+ return toApiSpecificExpression(unresolvedCall(
+ CAST,
+ toExpr(),
+ typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE))));
+ }
+
+ /**
+ * Parses a time string in the form "HH:mm:ss" to a SQL Time.
+ */
+ public OutType toTime() {
+ return toApiSpecificExpression(unresolvedCall(
+ CAST,
+ toExpr(),
+ typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME))));
+ }
+
+ /**
+ * Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a SQL Timestamp.
+ */
+ public OutType toTimestamp() {
+ return toApiSpecificExpression(unresolvedCall(
+ CAST,
+ toExpr(),
+ typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP))));
+ }
+
+ /**
+ * Extracts parts of a time point or time interval. Returns the part as a long value.
+ *
+ * <p>e.g. lit("2006-06-05").toDate().extract(DAY) leads to 5
+ */
+ public OutType extract(TimeIntervalUnit timeIntervalUnit) {
+ return toApiSpecificExpression(unresolvedCall(EXTRACT, valueLiteral(timeIntervalUnit), toExpr()));
+ }
+
+ /**
+ * Rounds down a time point to the given unit.
+ *
+ * <p>e.g. lit("12:44:31").toDate().floor(MINUTE) leads to 12:44:00
+ */
+ public OutType floor(TimeIntervalUnit timeIntervalUnit) {
+ return toApiSpecificExpression(unresolvedCall(FLOOR, valueLiteral(timeIntervalUnit), toExpr()));
+ }
+
+ /**
+ * Rounds up a time point to the given unit.
+ *
+ * <p>e.g. lit("12:44:31").toDate().ceil(MINUTE) leads to 12:45:00
+ */
+ public OutType ceil(TimeIntervalUnit timeIntervalUnit) {
+ return toApiSpecificExpression(unresolvedCall(
+ CEIL,
+ valueLiteral(timeIntervalUnit),
+ toExpr()));
+ }
+
+ // Advanced type helper functions
+
+ /**
+ * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
+ * returns it's value.
+ *
+ * @param name name of the field (similar to Flink's field expressions)
+ */
+ public OutType get(String name) {
+ return toApiSpecificExpression(unresolvedCall(GET, toExpr(), valueLiteral(name)));
+ }
+
+ /**
+ * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
+ * returns it's value.
+ *
+ * @param index position of the field
+ */
+ public OutType get(int index) {
+ return toApiSpecificExpression(unresolvedCall(GET, toExpr(), valueLiteral(index)));
+ }
+
+ /**
+ * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
+ * into a flat representation where every subtype is a separate field.
+ */
+ public OutType flatten() {
+ return toApiSpecificExpression(unresolvedCall(FLATTEN, toExpr()));
+ }
+
+ /**
+ * Accesses the element of an array or map based on a key or an index (starting at 1).
+ *
+ * @param index key or position of the element (array index starting at 1)
+ */
+ public OutType at(InType index) {
+ return toApiSpecificExpression(unresolvedCall(AT, toExpr(), objectToExpression(index)));
+ }
+
+ /**
+ * Returns the number of elements of an array or number of entries of a map.
+ */
+ public OutType cardinality() {
+ return toApiSpecificExpression(unresolvedCall(CARDINALITY, toExpr()));
+ }
+
+ /**
+ * Returns the sole element of an array with a single element. Returns null if the array is
+ * empty. Throws an exception if the array has more than one element.
+ */
+ public OutType element() {
+ return toApiSpecificExpression(unresolvedCall(ARRAY_ELEMENT, toExpr()));
+ }
+
+ // Time definition
+
+ /**
+ * Declares a field as the rowtime attribute for indicating, accessing, and working in
+ * Flink's event time.
+ */
+ public OutType rowtime() {
+ return toApiSpecificExpression(unresolvedCall(ROWTIME, toExpr()));
+ }
+
+ /**
+ * Declares a field as the proctime attribute for indicating, accessing, and working in
+ * Flink's processing time.
+ */
+ public OutType proctime() {
+ return toApiSpecificExpression(unresolvedCall(PROCTIME, toExpr()));
+ }
+
+ /**
+ * Creates an interval of the given number of years.
+ *
+ * <p>The produced expression is of type {@code DataTypes.INTERVAL}
+ */
+ public OutType year() {
+ return toApiSpecificExpression(toMonthInterval(toExpr(), 12));
+ }
+
+ /**
+ * Creates an interval of the given number of years.
+ */
+ public OutType years() {
+ return year();
+ }
+
+ /**
+ * Creates an interval of the given number of quarters.
+ */
+ public OutType quarter() {
+ return toApiSpecificExpression(toMonthInterval(toExpr(), 3));
+ }
+
+ /**
+ * Creates an interval of the given number of quarters.
+ */
+ public OutType quarters() {
+ return quarter();
+ }
+
+ /**
+ * Creates an interval of the given number of months.
+ */
+ public OutType month() {
+ return toApiSpecificExpression(toMonthInterval(toExpr(), 1));
+ }
+
+ /**
+ * Creates an interval of the given number of months.
+ */
+ public OutType months() {
+ return month();
+ }
+
+ /**
+ * Creates an interval of the given number of weeks.
+ */
+ public OutType week() {
+ return toApiSpecificExpression(toMilliInterval(toExpr(), 7 * MILLIS_PER_DAY));
+ }
+
+ /**
+ * Creates an interval of the given number of weeks.
+ */
+ public OutType weeks() {
+ return week();
+ }
+
+ /**
+ * Creates an interval of the given number of days.
+ */
+ public OutType day() {
+ return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_DAY));
+ }
+
+ /**
+ * Creates an interval of the given number of days.
+ */
+ public OutType days() {
+ return day();
+ }
+
+ /**
+ * Creates an interval of the given number of hours.
+ */
+ public OutType hour() {
+ return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_HOUR));
+ }
+
+ /**
+ * Creates an interval of the given number of hours.
+ */
+ public OutType hours() {
+ return hour();
+ }
+
+ /**
+ * Creates an interval of the given number of minutes.
+ */
+ public OutType minute() {
+ return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_MINUTE));
+ }
+
+ /**
+ * Creates an interval of the given number of minutes.
+ */
+ public OutType minutes() {
+ return minute();
+ }
+
+ /**
+ * Creates an interval of the given number of seconds.
+ */
+ public OutType second() {
+ return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_SECOND));
+ }
+
+ /**
+ * Creates an interval of the given number of seconds.
+ */
+ public OutType seconds() {
+ return second();
+ }
+
+ /**
+ * Creates an interval of the given number of milliseconds.
+ */
+ public OutType milli() {
+ return toApiSpecificExpression(toMilliInterval(toExpr(), 1));
+ }
+
+ /**
+ * Creates an interval of the given number of milliseconds.
+ */
+ public OutType millis() {
+ return milli();
+ }
+
+ // Hash functions
+
+ /**
+ * Returns the MD5 hash of the string argument; null if string is null.
+ *
+ * @return string of 32 hexadecimal digits or null
+ */
+ public OutType md5() {
+ return toApiSpecificExpression(unresolvedCall(MD5, toExpr()));
+ }
+
+ /**
+ * Returns the SHA-1 hash of the string argument; null if string is null.
+ *
+ * @return string of 40 hexadecimal digits or null
+ */
+ public OutType sha1() {
+ return toApiSpecificExpression(unresolvedCall(SHA1, toExpr()));
+ }
+
+ /**
+ * Returns the SHA-224 hash of the string argument; null if string is null.
+ *
+ * @return string of 56 hexadecimal digits or null
+ */
+ public OutType sha224() {
+ return toApiSpecificExpression(unresolvedCall(SHA224, toExpr()));
+ }
+
+ /**
+ * Returns the SHA-256 hash of the string argument; null if string is null.
+ *
+ * @return string of 64 hexadecimal digits or null
+ */
+ public OutType sha256() {
+ return toApiSpecificExpression(unresolvedCall(SHA256, toExpr()));
+ }
+
+ /**
+ * Returns the SHA-384 hash of the string argument; null if string is null.
+ *
+ * @return string of 96 hexadecimal digits or null
+ */
+ public OutType sha384() {
+ return toApiSpecificExpression(unresolvedCall(SHA384, toExpr()));
+ }
+
+ /**
+ * Returns the SHA-512 hash of the string argument; null if string is null.
+ *
+ * @return string of 128 hexadecimal digits or null
+ */
+ public OutType sha512() {
+ return toApiSpecificExpression(unresolvedCall(SHA512, toExpr()));
+ }
+
+ /**
+ * Returns the hash for the given string expression using the SHA-2 family of hash
+ * functions (SHA-224, SHA-256, SHA-384, or SHA-512).
+ *
+ * @param hashLength bit length of the result (either 224, 256, 384, or 512)
+ * @return string or null if one of the arguments is null.
+ */
+ public OutType sha2(InType hashLength) {
+ return toApiSpecificExpression(unresolvedCall(SHA2, toExpr(), objectToExpression(hashLength)));
+ }
+}
+
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 2b545ab..2333ffa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -119,9 +119,7 @@ public class TableImpl implements Table {
@Override
public Table select(Expression... fields) {
- List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields)
- .map(f -> f.accept(lookupResolver))
- .collect(Collectors.toList());
+ List<Expression> expressionsWithResolvedCalls = preprocessExpressions(fields);
CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
expressionsWithResolvedCalls
);
@@ -464,9 +462,7 @@ public class TableImpl implements Table {
}
private Table addColumnsOperation(boolean replaceIfExist, List<Expression> fields) {
- List<Expression> expressionsWithResolvedCalls = fields.stream()
- .map(f -> f.accept(lookupResolver))
- .collect(Collectors.toList());
+ List<Expression> expressionsWithResolvedCalls = preprocessExpressions(fields);
CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
expressionsWithResolvedCalls
);
@@ -563,6 +559,16 @@ public class TableImpl implements Table {
return new TableImpl(tableEnvironment, operation, operationTreeBuilder, lookupResolver);
}
+ private List<Expression> preprocessExpressions(List<Expression> expressions) {
+ return preprocessExpressions(expressions.toArray(new Expression[0]));
+ }
+
+ private List<Expression> preprocessExpressions(Expression[] expressions) {
+ return Arrays.stream(expressions)
+ .map(f -> f.accept(lookupResolver))
+ .collect(Collectors.toList());
+ }
+
private static final class GroupedTableImpl implements GroupedTable {
private final TableImpl table;
@@ -582,9 +588,7 @@ public class TableImpl implements Table {
@Override
public Table select(Expression... fields) {
- List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields)
- .map(f -> f.accept(table.lookupResolver))
- .collect(Collectors.toList());
+ List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields);
CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
expressionsWithResolvedCalls
);
@@ -716,7 +720,8 @@ public class TableImpl implements Table {
@Override
public WindowGroupedTable groupBy(Expression... fields) {
- List<Expression> fieldsWithoutWindow = Arrays.stream(fields)
+ List<Expression> fieldsWithoutWindow = table.preprocessExpressions(fields)
+ .stream()
.filter(f -> !window.getAlias().equals(f))
.collect(Collectors.toList());
if (fields.length != fieldsWithoutWindow.size() + 1) {
@@ -749,9 +754,7 @@ public class TableImpl implements Table {
@Override
public Table select(Expression... fields) {
- List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields)
- .map(f -> f.accept(table.lookupResolver))
- .collect(Collectors.toList());
+ List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields);
CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
expressionsWithResolvedCalls
);
@@ -816,9 +819,7 @@ public class TableImpl implements Table {
@Override
public Table select(Expression... fields) {
- List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields)
- .map(f -> f.accept(table.lookupResolver))
- .collect(Collectors.toList());
+ List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields);
CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
expressionsWithResolvedCalls
);
@@ -873,9 +874,7 @@ public class TableImpl implements Table {
@Override
public Table select(Expression... fields) {
- List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields)
- .map(f -> f.accept(table.lookupResolver))
- .collect(Collectors.toList());
+ List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields);
CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
expressionsWithResolvedCalls
);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
index ec57fd5..92f8d83 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.expressions;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.ValidationException;
@@ -31,6 +32,7 @@ import org.apache.flink.table.types.DataType;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Utilities for API-specific {@link Expression}s.
@@ -50,6 +52,24 @@ public final class ApiExpressionUtils {
// private
}
+ public static Expression objectToExpression(Object expression) {
+ if (expression instanceof ApiExpression) {
+ return ((ApiExpression) expression).toExpr();
+ } else if (expression instanceof Expression) {
+ return (Expression) expression;
+ } else {
+ return valueLiteral(expression);
+ }
+ }
+
+ public static Expression unwrapFromApi(Expression expression) {
+ if (expression instanceof ApiExpression) {
+ return ((ApiExpression) expression).toExpr();
+ } else {
+ return expression;
+ }
+ }
+
public static LocalReferenceExpression localRef(String name, DataType dataType) {
return new LocalReferenceExpression(name, dataType);
}
@@ -74,14 +94,17 @@ public final class ApiExpressionUtils {
FunctionIdentifier functionIdentifier,
FunctionDefinition functionDefinition,
Expression... args) {
- return new UnresolvedCallExpression(functionIdentifier, functionDefinition, Arrays.asList(args));
+ return unresolvedCall(functionIdentifier, functionDefinition, Arrays.asList(args));
}
public static UnresolvedCallExpression unresolvedCall(
FunctionIdentifier functionIdentifier,
FunctionDefinition functionDefinition,
List<Expression> args) {
- return new UnresolvedCallExpression(functionIdentifier, functionDefinition, args);
+ return new UnresolvedCallExpression(functionIdentifier, functionDefinition,
+ args.stream()
+ .map(ApiExpressionUtils::unwrapFromApi)
+ .collect(Collectors.toList()));
}
public static UnresolvedCallExpression unresolvedCall(FunctionDefinition functionDefinition, Expression... args) {
@@ -89,7 +112,11 @@ public final class ApiExpressionUtils {
}
public static UnresolvedCallExpression unresolvedCall(FunctionDefinition functionDefinition, List<Expression> args) {
- return new UnresolvedCallExpression(functionDefinition, args);
+ return new UnresolvedCallExpression(
+ functionDefinition,
+ args.stream()
+ .map(ApiExpressionUtils::unwrapFromApi)
+ .collect(Collectors.toList()));
}
public static TableReferenceExpression tableRef(String name, Table table) {
@@ -101,7 +128,11 @@ public final class ApiExpressionUtils {
}
public static LookupCallExpression lookupCall(String name, Expression... args) {
- return new LookupCallExpression(name, Arrays.asList(args));
+ return new LookupCallExpression(
+ name,
+ Arrays.stream(args)
+ .map(ApiExpressionUtils::unwrapFromApi)
+ .collect(Collectors.toList()));
}
public static Expression toMonthInterval(Expression e, int multiplier) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java
index d30efd5..ef0663a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.util.Preconditions;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -43,7 +42,7 @@ public final class LookupCallExpression implements Expression {
LookupCallExpression(String unresolvedFunction, List<Expression> args) {
this.unresolvedName = Preconditions.checkNotNull(unresolvedFunction);
- this.args = Collections.unmodifiableList(new ArrayList<>(Preconditions.checkNotNull(args)));
+ this.args = Collections.unmodifiableList(Preconditions.checkNotNull(args));
}
public String getUnresolvedName() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
index 7fdf57d..a280d0e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -61,8 +60,7 @@ public final class UnresolvedCallExpression implements Expression {
Preconditions.checkNotNull(functionIdentifier, "Function identifier must not be null.");
this.functionDefinition =
Preconditions.checkNotNull(functionDefinition, "Function definition must not be null.");
- this.args = Collections.unmodifiableList(
- new ArrayList<>(Preconditions.checkNotNull(args, "Arguments must not be null.")));
+ this.args = Collections.unmodifiableList(Preconditions.checkNotNull(args, "Arguments must not be null."));
}
UnresolvedCallExpression(
@@ -71,8 +69,7 @@ public final class UnresolvedCallExpression implements Expression {
this.functionIdentifier = null;
this.functionDefinition =
Preconditions.checkNotNull(functionDefinition, "Function definition must not be null.");
- this.args = Collections.unmodifiableList(
- new ArrayList<>(Preconditions.checkNotNull(args, "Arguments must not be null.")));
+ this.args = Collections.unmodifiableList(Preconditions.checkNotNull(args, "Arguments must not be null."));
}
public Optional<FunctionIdentifier> getFunctionIdentifier() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 434697d..59ef852 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -77,6 +77,7 @@ public class ExpressionResolver {
*/
public static List<ResolverRule> getExpandingResolverRules() {
return Arrays.asList(
+ ResolverRules.UNWRAP_API_EXPRESSION,
ResolverRules.LOOKUP_CALL_BY_NAME,
ResolverRules.FLATTEN_STAR_REFERENCE,
ResolverRules.EXPAND_COLUMN_FUNCTIONS);
@@ -87,6 +88,7 @@ public class ExpressionResolver {
*/
public static List<ResolverRule> getAllResolverRules() {
return Arrays.asList(
+ ResolverRules.UNWRAP_API_EXPRESSION,
ResolverRules.LOOKUP_CALL_BY_NAME,
ResolverRules.FLATTEN_STAR_REFERENCE,
ResolverRules.EXPAND_COLUMN_FUNCTIONS,
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
index 93471ef..3071fa6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.expressions.resolver;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.FunctionLookup;
import org.apache.flink.table.expressions.Expression;
@@ -65,6 +66,17 @@ public class LookupCallResolver extends ApiExpressionDefaultVisitor<Expression>
}
@Override
+ public Expression visitNonApiExpression(Expression other) {
+ // LookupCallResolver might be called outside of ExpressionResolver, thus we need to additionally
+ // handle the ApiExpressions here
+ if (other instanceof ApiExpression) {
+ return ((ApiExpression) other).toExpr().accept(this);
+ } else {
+ return defaultMethod(other);
+ }
+ }
+
+ @Override
protected Expression defaultMethod(Expression expression) {
return expression;
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
index 42dc0757..3acb5af 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
@@ -55,7 +55,7 @@ final class OverWindowResolverRule implements ResolverRule {
.collect(Collectors.toList());
}
- private class ExpressionResolverVisitor extends RuleExpressionVisitor<Expression> {
+ private static class ExpressionResolverVisitor extends RuleExpressionVisitor<Expression> {
ExpressionResolverVisitor(ResolutionContext context) {
super(context);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
index 671b1e1..aab2272 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.expressions.resolver.rules;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
/**
@@ -62,6 +63,11 @@ public final class ResolverRules {
*/
public static final ResolverRule QUALIFY_BUILT_IN_FUNCTIONS = new QualifyBuiltInFunctionsRule();
+ /**
+ * Unwraps all {@link ApiExpression}.
+ */
+ public static final ResolverRule UNWRAP_API_EXPRESSION = new UnwrapApiExpressionRule();
+
private ResolverRules() {
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java
similarity index 59%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java
index 5180d33..1ef374b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java
@@ -16,25 +16,25 @@
* limitations under the License.
*/
-package org.apache.flink.table.api;
+package org.apache.flink.table.expressions.resolver.rules;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
+import java.util.List;
+import java.util.stream.Collectors;
+
/**
- * Tumbling window on time with alias. Fully specifies a window.
+ * Unwraps all {@link ApiExpression}.
*/
-@PublicEvolving
-public final class TumbleWithSizeOnTimeWithAlias extends GroupWindow {
-
- private final Expression size;
-
- TumbleWithSizeOnTimeWithAlias(Expression alias, Expression timeField, Expression size) {
- super(alias, timeField);
- this.size = size;
- }
-
- public Expression getSize() {
- return size;
+@Internal
+final class UnwrapApiExpressionRule implements ResolverRule {
+ @Override
+ public List<Expression> apply(
+ List<Expression> expression,
+ ResolutionContext context) {
+ return expression.stream().map(ApiExpressionUtils::unwrapFromApi).collect(Collectors.toList());
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
index 4877bc4..3a3b0a0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionUtils;
import org.apache.flink.table.expressions.UnresolvedCallExpression;
@@ -154,7 +155,7 @@ public class FieldInfoUtils {
* used if the input type has a defined field order (tuple, case class, Row) and no of fields
* references a field of the input type.
*/
- public static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) {
+ private static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) {
if (!(ct instanceof TupleTypeInfoBase)) {
return false;
}
@@ -224,7 +225,10 @@ public class FieldInfoUtils {
public static <A> TypeInfoSchema getFieldsInfo(TypeInformation<A> inputType, Expression[] expressions) {
validateInputTypeInfo(inputType);
- final List<FieldInfo> fieldInfos = extractFieldInformation(inputType, expressions);
+ final List<FieldInfo> fieldInfos = extractFieldInformation(
+ inputType,
+ Arrays.stream(expressions).map(ApiExpressionUtils::unwrapFromApi).toArray(Expression[]::new)
+ );
validateNoStarReference(fieldInfos);
boolean isRowtimeAttribute = checkIfRowtimeAttribute(fieldInfos);
@@ -244,8 +248,8 @@ public class FieldInfoUtils {
}
private static <A> List<FieldInfo> extractFieldInformation(
- TypeInformation<A> inputType,
- Expression[] exprs) {
+ TypeInformation<A> inputType,
+ Expression[] exprs) {
final List<FieldInfo> fieldInfos;
if (inputType instanceof GenericTypeInfo && inputType.getTypeClass() == Row.class) {
throw new ValidationException(
@@ -367,7 +371,7 @@ public class FieldInfoUtils {
final TypeInformation<?>[] fieldTypes;
if (inputType instanceof CompositeType) {
int arity = inputType.getArity();
- CompositeType ct = (CompositeType<?>) inputType;
+ CompositeType<?> ct = (CompositeType<?>) inputType;
fieldTypes = IntStream.range(0, arity).mapToObj(ct::getTypeAt).toArray(TypeInformation[]::new);
} else {
fieldTypes = new TypeInformation[]{inputType};
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
index cdf4042..cb71dc7 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
@@ -17,105 +17,82 @@
*/
package org.apache.flink.table.api
-import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short => JShort}
-import java.math.{BigDecimal => JBigDecimal}
-import java.sql.{Date, Time, Timestamp}
-import java.time.{LocalDate, LocalDateTime, LocalTime}
-
import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.internal.BaseExpressions
+import org.apache.flink.table.expressions.ApiExpressionUtils._
import org.apache.flink.table.expressions._
-import ApiExpressionUtils._
import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedAggregateFunction, UserDefinedFunctionHelper, _}
import org.apache.flink.table.types.DataType
-import org.apache.flink.table.types.utils.TypeConversions
-import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short => JShort}
+import java.math.{BigDecimal => JBigDecimal}
+import java.sql.{Date, Time, Timestamp}
+import java.time.{LocalDate, LocalDateTime, LocalTime}
import _root_.scala.language.implicitConversions
/**
* These are all the operations that can be used to construct an [[Expression]] AST for
* expression operations.
- *
- * These operations must be kept in sync with the parser in
- * [[org.apache.flink.table.expressions.ExpressionParser]].
*/
@PublicEvolving
-trait ImplicitExpressionOperations {
+trait ImplicitExpressionOperations extends BaseExpressions[Expression, Expression] {
private[flink] def expr: Expression
+ override def toExpr: Expression = expr
+
+ override protected def toApiSpecificExpression(expression: Expression): Expression = expression
+
/**
- * Enables literals on left side of binary expressions.
- *
- * e.g. 12.toExpr % 'a
- *
- * @return expression
- */
- def toExpr: Expression = expr
+ * Specifies a name for an expression i.e. a field.
+ *
+ * @param name name for one field
+ * @param extraNames additional names if the expression expands to multiple fields
+ * @return field with an alias
+ */
+ def as(name: Symbol, extraNames: Symbol*): Expression = as(name.name, extraNames.map(_.name): _*)
/**
* Boolean AND in three-valued logic.
*/
- def && (other: Expression): Expression = unresolvedCall(AND, expr, other)
+ def && (other: Expression): Expression = and(other)
/**
* Boolean OR in three-valued logic.
*/
- def || (other: Expression): Expression = unresolvedCall(OR, expr, other)
+ def || (other: Expression): Expression = or(other)
/**
* Greater than.
*/
- def > (other: Expression): Expression = unresolvedCall(GREATER_THAN, expr, other)
+ def > (other: Expression): Expression = isGreater(other)
/**
* Greater than or equal.
*/
- def >= (other: Expression): Expression = unresolvedCall(GREATER_THAN_OR_EQUAL, expr, other)
+ def >= (other: Expression): Expression = isGreaterOrEqual(other)
/**
* Less than.
*/
- def < (other: Expression): Expression = unresolvedCall(LESS_THAN, expr, other)
+ def < (other: Expression): Expression = isLess(other)
/**
* Less than or equal.
*/
- def <= (other: Expression): Expression = unresolvedCall(LESS_THAN_OR_EQUAL, expr, other)
+ def <= (other: Expression): Expression = isLessOrEqual(other)
/**
* Equals.
*/
- def === (other: Expression): Expression = unresolvedCall(EQUALS, expr, other)
+ def === (other: Expression): Expression = isEqual(other)
/**
* Not equal.
*/
- def !== (other: Expression): Expression = unresolvedCall(NOT_EQUALS, expr, other)
-
- /**
- * Returns true if the given expression is between lowerBound and upperBound (both inclusive).
- * False otherwise. The parameters must be numeric types or identical comparable types.
- *
- * @param lowerBound numeric or comparable expression
- * @param upperBound numeric or comparable expression
- * @return boolean or null
- */
- def between(lowerBound: Expression, upperBound: Expression): Expression =
- unresolvedCall(BETWEEN, expr, lowerBound, upperBound)
-
- /**
- * Returns true if the given expression is not between lowerBound and upperBound (both
- * inclusive). False otherwise. The parameters must be numeric types or identical
- * comparable types.
- *
- * @param lowerBound numeric or comparable expression
- * @param upperBound numeric or comparable expression
- * @return boolean or null
- */
- def notBetween(lowerBound: Expression, upperBound: Expression): Expression =
- unresolvedCall(NOT_BETWEEN, expr, lowerBound, upperBound)
+ def !== (other: Expression): Expression = isNotEqual(other)
/**
* Whether boolean expression is not true; returns null if boolean is null.
@@ -125,7 +102,7 @@ trait ImplicitExpressionOperations {
/**
* Returns negative numeric.
*/
- def unary_- : Expression = unresolvedCall(MINUS_PREFIX, expr)
+ def unary_- : Expression = Expressions.negative(expr)
/**
* Returns numeric.
@@ -133,54 +110,24 @@ trait ImplicitExpressionOperations {
def unary_+ : Expression = expr
/**
- * Returns true if the given expression is null.
- */
- def isNull: Expression = unresolvedCall(IS_NULL, expr)
-
- /**
- * Returns true if the given expression is not null.
- */
- def isNotNull: Expression = unresolvedCall(IS_NOT_NULL, expr)
-
- /**
- * Returns true if given boolean expression is true. False otherwise (for null and false).
- */
- def isTrue: Expression = unresolvedCall(IS_TRUE, expr)
-
- /**
- * Returns true if given boolean expression is false. False otherwise (for null and true).
- */
- def isFalse: Expression = unresolvedCall(IS_FALSE, expr)
-
- /**
- * Returns true if given boolean expression is not true (for null and false). False otherwise.
- */
- def isNotTrue: Expression = unresolvedCall(IS_NOT_TRUE, expr)
-
- /**
- * Returns true if given boolean expression is not false (for null and true). False otherwise.
- */
- def isNotFalse: Expression = unresolvedCall(IS_NOT_FALSE, expr)
-
- /**
* Returns left plus right.
*/
- def + (other: Expression): Expression = unresolvedCall(PLUS, expr, other)
+ def + (other: Expression): Expression = plus(other)
/**
* Returns left minus right.
*/
- def - (other: Expression): Expression = unresolvedCall(MINUS, expr, other)
+ def - (other: Expression): Expression = minus(other)
/**
* Returns left divided by right.
*/
- def / (other: Expression): Expression = unresolvedCall(DIVIDE, expr, other)
+ def / (other: Expression): Expression = dividedBy(other)
/**
* Returns left multiplied by right.
*/
- def * (other: Expression): Expression = unresolvedCall(TIMES, expr, other)
+ def * (other: Expression): Expression = times(other)
/**
* Returns the remainder (modulus) of left divided by right.
@@ -194,352 +141,23 @@ trait ImplicitExpressionOperations {
*
* e.g. withColumns(1 to 3)
*/
- def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, other)
-
- /**
- * Similar to a SQL distinct aggregation clause such as COUNT(DISTINCT a), declares that an
- * aggregation function is only applied on distinct input values.
- *
- * For example:
- *
- * {{{
- * orders
- * .groupBy('a)
- * .select('a, 'b.sum.distinct as 'd)
- * }}}
- */
- def distinct: Expression = unresolvedCall(DISTINCT, expr)
-
- /**
- * Returns the sum of the numeric field across all input values.
- * If all values are null, null is returned.
- */
- def sum: Expression = unresolvedCall(SUM, expr)
-
- /**
- * Returns the sum of the numeric field across all input values.
- * If all values are null, 0 is returned.
- */
- def sum0: Expression = unresolvedCall(SUM0, expr)
-
- /**
- * Returns the minimum value of field across all input values.
- */
- def min: Expression = unresolvedCall(MIN, expr)
-
- /**
- * Returns the maximum value of field across all input values.
- */
- def max: Expression = unresolvedCall(MAX, expr)
-
- /**
- * Returns the number of input rows for which the field is not null.
- */
- def count: Expression = unresolvedCall(COUNT, expr)
-
- /**
- * Returns the average (arithmetic mean) of the numeric field across all input values.
- */
- def avg: Expression = unresolvedCall(AVG, expr)
-
- /**
- * Returns the population standard deviation of an expression (the square root of varPop()).
- */
- def stddevPop: Expression = unresolvedCall(STDDEV_POP, expr)
-
- /**
- * Returns the sample standard deviation of an expression (the square root of varSamp()).
- */
- def stddevSamp: Expression = unresolvedCall(STDDEV_SAMP, expr)
-
- /**
- * Returns the population standard variance of an expression.
- */
- def varPop: Expression = unresolvedCall(VAR_POP, expr)
-
- /**
- * Returns the sample variance of a given expression.
- */
- def varSamp: Expression = unresolvedCall(VAR_SAMP, expr)
-
- /**
- * Returns multiset aggregate of a given expression.
- */
- def collect: Expression = unresolvedCall(COLLECT, expr)
-
- /**
- * Converts a value to a given data type.
- *
- * e.g. "42".cast(DataTypes.INT()) leads to 42.
- *
- * @return casted expression
- */
- def cast(toType: DataType): Expression =
- unresolvedCall(CAST, expr, typeLiteral(toType))
-
- /**
- * @deprecated This method will be removed in future versions as it uses the old type system. It
- * is recommended to use [[cast(DataType)]] instead which uses the new type system
- * based on [[DataTypes]]. Please make sure to use either the old or the new type
- * system consistently to avoid unintended behavior. See the website documentation
- * for more information.
- */
- @deprecated
- def cast(toType: TypeInformation[_]): Expression =
- unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(toType)))
-
- /**
- * Specifies a name for an expression i.e. a field.
- *
- * @param name name for one field
- * @param extraNames additional names if the expression expands to multiple fields
- * @return field with an alias
- */
- def as(name: Symbol, extraNames: Symbol*): Expression =
- unresolvedCall(
- AS,
- expr +: valueLiteral(name.name) +: extraNames.map(name => valueLiteral(name.name)): _*)
-
- /**
- * Specifies ascending order of an expression i.e. a field for orderBy call.
- *
- * @return ascend expression
- */
- def asc: Expression = unresolvedCall(ORDER_ASC, expr)
-
- /**
- * Specifies descending order of an expression i.e. a field for orderBy call.
- *
- * @return descend expression
- */
- def desc: Expression = unresolvedCall(ORDER_DESC, expr)
-
- /**
- * Returns true if an expression exists in a given list of expressions. This is a shorthand
- * for multiple OR conditions.
- *
- * If the testing set contains null, the result will be null if the element can not be found
- * and true if it can be found. If the element is null, the result is always null.
- *
- * e.g. "42".in(1, 2, 3) leads to false.
- */
- def in(elements: Expression*): Expression = unresolvedCall(IN, expr +: elements: _*)
-
- /**
- * Returns true if an expression exists in a given table sub-query. The sub-query table
- * must consist of one column. This column must have the same data type as the expression.
- *
- * Note: This operation is not supported in a streaming environment yet.
- */
- def in(table: Table): Expression = unresolvedCall(IN, expr, tableRef(table.toString, table))
-
- /**
- * Returns the start time (inclusive) of a window when applied on a window reference.
- */
- def start: Expression = unresolvedCall(WINDOW_START, expr)
-
- /**
- * Returns the end time (exclusive) of a window when applied on a window reference.
- *
- * e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000.
- */
- def end: Expression = unresolvedCall(WINDOW_END, expr)
+ def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, objectToExpression(other))
/**
* Ternary conditional operator that decides which of two other expressions should be
* based on a evaluated boolean condition.
*
- * e.g. (42 > 5).?("A", "B") leads to "A"
+ * e.g. ($"f0" > 5).?("A", "B") leads to "A"
*
* @param ifTrue expression to be evaluated if condition holds
* @param ifFalse expression to be evaluated if condition does not hold
*/
def ?(ifTrue: Expression, ifFalse: Expression): Expression =
- unresolvedCall(IF, expr, ifTrue, ifFalse)
+ Expressions.ifThenElse(expr, ifTrue, ifFalse).toExpr
// scalar functions
/**
- * Calculates the remainder of division the given number by another one.
- */
- def mod(other: Expression): Expression = unresolvedCall(MOD, expr, other)
-
- /**
- * Calculates the Euler's number raised to the given power.
- */
- def exp(): Expression = unresolvedCall(EXP, expr)
-
- /**
- * Calculates the base 10 logarithm of the given value.
- */
- def log10(): Expression = unresolvedCall(LOG10, expr)
-
- /**
- * Calculates the base 2 logarithm of the given value.
- */
- def log2(): Expression = unresolvedCall(LOG2, expr)
-
- /**
- * Calculates the natural logarithm of the given value.
- */
- def ln(): Expression = unresolvedCall(LN, expr)
-
- /**
- * Calculates the natural logarithm of the given value.
- */
- def log(): Expression = unresolvedCall(LOG, expr)
-
- /**
- * Calculates the logarithm of the given value to the given base.
- */
- def log(base: Expression): Expression = unresolvedCall(LOG, base, expr)
-
- /**
- * Calculates the given number raised to the power of the other value.
- */
- def power(other: Expression): Expression = unresolvedCall(POWER, expr, other)
-
- /**
- * Calculates the hyperbolic cosine of a given value.
- */
- def cosh(): Expression = unresolvedCall(COSH, expr)
-
- /**
- * Calculates the square root of a given value.
- */
- def sqrt(): Expression = unresolvedCall(SQRT, expr)
-
- /**
- * Calculates the absolute value of given value.
- */
- def abs(): Expression = unresolvedCall(ABS, expr)
-
- /**
- * Calculates the largest integer less than or equal to a given number.
- */
- def floor(): Expression = unresolvedCall(FLOOR, expr)
-
- /**
- * Calculates the hyperbolic sine of a given value.
- */
- def sinh(): Expression = unresolvedCall(SINH, expr)
-
- /**
- * Calculates the smallest integer greater than or equal to a given number.
- */
- def ceil(): Expression = unresolvedCall(CEIL, expr)
-
- /**
- * Calculates the sine of a given number.
- */
- def sin(): Expression = unresolvedCall(SIN, expr)
-
- /**
- * Calculates the cosine of a given number.
- */
- def cos(): Expression = unresolvedCall(COS, expr)
-
- /**
- * Calculates the tangent of a given number.
- */
- def tan(): Expression = unresolvedCall(TAN, expr)
-
- /**
- * Calculates the cotangent of a given number.
- */
- def cot(): Expression = unresolvedCall(COT, expr)
-
- /**
- * Calculates the arc sine of a given number.
- */
- def asin(): Expression = unresolvedCall(ASIN, expr)
-
- /**
- * Calculates the arc cosine of a given number.
- */
- def acos(): Expression = unresolvedCall(ACOS, expr)
-
- /**
- * Calculates the arc tangent of a given number.
- */
- def atan(): Expression = unresolvedCall(ATAN, expr)
-
- /**
- * Calculates the hyperbolic tangent of a given number.
- */
- def tanh(): Expression = unresolvedCall(TANH, expr)
-
- /**
- * Converts numeric from radians to degrees.
- */
- def degrees(): Expression = unresolvedCall(DEGREES, expr)
-
- /**
- * Converts numeric from degrees to radians.
- */
- def radians(): Expression = unresolvedCall(RADIANS, expr)
-
- /**
- * Calculates the signum of a given number.
- */
- def sign(): Expression = unresolvedCall(SIGN, expr)
-
- /**
- * Rounds the given number to integer places right to the decimal point.
- */
- def round(places: Expression): Expression = unresolvedCall(ROUND, expr, places)
-
- /**
- * Returns a string representation of an integer numeric value in binary format. Returns null if
- * numeric is null. E.g. "4" leads to "100", "12" leads to "1100".
- */
- def bin(): Expression = unresolvedCall(BIN, expr)
-
- /**
- * Returns a string representation of an integer numeric value or a string in hex format. Returns
- * null if numeric or string is null.
- *
- * E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a string "hello,world" leads
- * to "68656c6c6f2c776f726c64".
- */
- def hex(): Expression = unresolvedCall(HEX, expr)
-
- /**
- * Returns a number of truncated to n decimal places.
- * If n is 0,the result has no decimal point or fractional part.
- * n can be negative to cause n digits left of the decimal point of the value to become zero.
- * E.g. truncate(42.345, 2) to 42.34.
- */
- def truncate(n: Expression): Expression = unresolvedCall(TRUNCATE, expr, n)
-
- /**
- * Returns a number of truncated to 0 decimal places.
- * E.g. truncate(42.345) to 42.0.
- */
- def truncate(): Expression = unresolvedCall(TRUNCATE, expr)
-
- // String operations
-
- /**
- * Creates a substring of the given string at given index for a given length.
- *
- * @param beginIndex first character of the substring (starting at 1, inclusive)
- * @param length number of characters of the substring
- * @return substring
- */
- def substring(beginIndex: Expression, length: Expression): Expression =
- unresolvedCall(SUBSTRING, expr, beginIndex, length)
-
- /**
- * Creates a substring of the given string beginning at the given index to the end.
- *
- * @param beginIndex first character of the substring (starting at 1, inclusive)
- * @return substring
- */
- def substring(beginIndex: Expression): Expression =
- unresolvedCall(SUBSTRING, expr, beginIndex)
-
- /**
* Removes leading and/or trailing characters from the given string.
*
* @param removeLeading if true, remove leading characters (default: true)
@@ -552,324 +170,14 @@ trait ImplicitExpressionOperations {
removeTrailing: Boolean = true,
character: Expression = valueLiteral(" "))
: Expression = {
- unresolvedCall(TRIM, valueLiteral(removeLeading), valueLiteral(removeTrailing), character, expr)
+ unresolvedCall(
+ TRIM,
+ valueLiteral(removeLeading),
+ valueLiteral(removeTrailing),
+ ApiExpressionUtils.objectToExpression(character),
+ expr)
}
- /**
- * Returns a new string which replaces all the occurrences of the search target
- * with the replacement string (non-overlapping).
- */
- def replace(search: Expression, replacement: Expression): Expression =
- unresolvedCall(REPLACE, expr, search, replacement)
-
- /**
- * Returns the length of a string.
- */
- def charLength(): Expression = unresolvedCall(CHAR_LENGTH, expr)
-
- /**
- * Returns all of the characters in a string in upper case using the rules of
- * the default locale.
- */
- def upperCase(): Expression = unresolvedCall(UPPER, expr)
-
- /**
- * Returns all of the characters in a string in lower case using the rules of
- * the default locale.
- */
- def lowerCase(): Expression = unresolvedCall(LOWER, expr)
-
- /**
- * Converts the initial letter of each word in a string to uppercase.
- * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.
- */
- def initCap(): Expression = unresolvedCall(INIT_CAP, expr)
-
- /**
- * Returns true, if a string matches the specified LIKE pattern.
- *
- * e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n"
- */
- def like(pattern: Expression): Expression = unresolvedCall(LIKE, expr, pattern)
-
- /**
- * Returns true, if a string matches the specified SQL regex pattern.
- *
- * e.g. "A+" matches all strings that consist of at least one A
- */
- def similar(pattern: Expression): Expression = unresolvedCall(SIMILAR, expr, pattern)
-
- /**
- * Returns the position of string in an other string starting at 1.
- * Returns 0 if string could not be found.
- *
- * e.g. "a".position("bbbbba") leads to 6
- */
- def position(haystack: Expression): Expression = unresolvedCall(POSITION, expr, haystack)
-
- /**
- * Returns a string left-padded with the given pad string to a length of len characters. If
- * the string is longer than len, the return value is shortened to len characters.
- *
- * e.g. "hi".lpad(4, '??') returns "??hi", "hi".lpad(1, '??') returns "h"
- */
- def lpad(len: Expression, pad: Expression): Expression = unresolvedCall(LPAD, expr, len, pad)
-
- /**
- * Returns a string right-padded with the given pad string to a length of len characters. If
- * the string is longer than len, the return value is shortened to len characters.
- *
- * e.g. "hi".rpad(4, '??') returns "hi??", "hi".rpad(1, '??') returns "h"
- */
- def rpad(len: Expression, pad: Expression): Expression = unresolvedCall(RPAD, expr, len, pad)
-
- /**
- * Defines an aggregation to be used for a previously specified over window.
- *
- * For example:
- *
- * {{{
- * table
- * .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
- * .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
- * }}}
- */
- def over(alias: Expression): Expression = unresolvedCall(OVER, expr, alias)
-
- /**
- * Replaces a substring of string with a string starting at a position (starting at 1).
- *
- * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx"
- */
- def overlay(newString: Expression, starting: Expression): Expression =
- unresolvedCall(OVERLAY, expr, newString, starting)
-
- /**
- * Replaces a substring of string with a string starting at a position (starting at 1).
- * The length specifies how many characters should be removed.
- *
- * e.g. "xxxxxtest".overlay("xxxx", 6, 2) leads to "xxxxxxxxxst"
- */
- def overlay(newString: Expression, starting: Expression, length: Expression): Expression =
- unresolvedCall(OVERLAY, expr, newString, starting, length)
-
- /**
- * Returns a string with all substrings that match the regular expression consecutively
- * being replaced.
- */
- def regexpReplace(regex: Expression, replacement: Expression): Expression =
- unresolvedCall(REGEXP_REPLACE, expr, regex, replacement)
-
- /**
- * Returns a string extracted with a specified regular expression and a regex match group
- * index.
- */
- def regexpExtract(regex: Expression, extractIndex: Expression): Expression =
- unresolvedCall(REGEXP_EXTRACT, expr, regex, extractIndex)
-
- /**
- * Returns a string extracted with a specified regular expression.
- */
- def regexpExtract(regex: Expression): Expression =
- unresolvedCall(REGEXP_EXTRACT, expr, regex)
-
- /**
- * Returns the base string decoded with base64.
- */
- def fromBase64(): Expression = unresolvedCall(FROM_BASE64, expr)
-
- /**
- * Returns the base64-encoded result of the input string.
- */
- def toBase64(): Expression = unresolvedCall(TO_BASE64, expr)
-
- /**
- * Returns a string that removes the left whitespaces from the given string.
- */
- def ltrim(): Expression = unresolvedCall(LTRIM, expr)
-
- /**
- * Returns a string that removes the right whitespaces from the given string.
- */
- def rtrim(): Expression = unresolvedCall(RTRIM, expr)
-
- /**
- * Returns a string that repeats the base string n times.
- */
- def repeat(n: Expression): Expression = unresolvedCall(REPEAT, expr, n)
-
- // Temporal operations
-
- /**
- * Parses a date string in the form "yyyy-MM-dd" to a SQL Date.
- */
- def toDate: Expression =
- unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE)))
-
- /**
- * Parses a time string in the form "HH:mm:ss" to a SQL Time.
- */
- def toTime: Expression =
- unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME)))
-
- /**
- * Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a SQL Timestamp.
- */
- def toTimestamp: Expression =
- unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP)))
-
- /**
- * Extracts parts of a time point or time interval. Returns the part as a long value.
- *
- * e.g. "2006-06-05".toDate.extract(DAY) leads to 5
- */
- def extract(timeIntervalUnit: TimeIntervalUnit): Expression =
- unresolvedCall(EXTRACT, valueLiteral(timeIntervalUnit), expr)
-
- /**
- * Rounds down a time point to the given unit.
- *
- * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
- */
- def floor(timeIntervalUnit: TimeIntervalUnit): Expression =
- unresolvedCall(FLOOR, valueLiteral(timeIntervalUnit), expr)
-
- /**
- * Rounds up a time point to the given unit.
- *
- * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00
- */
- def ceil(timeIntervalUnit: TimeIntervalUnit): Expression =
- unresolvedCall(CEIL, valueLiteral(timeIntervalUnit), expr)
-
- // Interval types
-
- /**
- * Creates an interval of the given number of years.
- *
- * @return interval of months
- */
- def year: Expression = toMonthInterval(expr, 12)
-
- /**
- * Creates an interval of the given number of years.
- *
- * @return interval of months
- */
- def years: Expression = year
-
- /**
- * Creates an interval of the given number of quarters.
- *
- * @return interval of months
- */
- def quarter: Expression = toMonthInterval(expr, 3)
-
- /**
- * Creates an interval of the given number of quarters.
- *
- * @return interval of months
- */
- def quarters: Expression = quarter
-
- /**
- * Creates an interval of the given number of months.
- *
- * @return interval of months
- */
- def month: Expression = toMonthInterval(expr, 1)
-
- /**
- * Creates an interval of the given number of months.
- *
- * @return interval of months
- */
- def months: Expression = month
-
- /**
- * Creates an interval of the given number of weeks.
- *
- * @return interval of milliseconds
- */
- def week: Expression = toMilliInterval(expr, 7 * MILLIS_PER_DAY)
-
- /**
- * Creates an interval of the given number of weeks.
- *
- * @return interval of milliseconds
- */
- def weeks: Expression = week
-
- /**
- * Creates an interval of the given number of days.
- *
- * @return interval of milliseconds
- */
- def day: Expression = toMilliInterval(expr, MILLIS_PER_DAY)
-
- /**
- * Creates an interval of the given number of days.
- *
- * @return interval of milliseconds
- */
- def days: Expression = day
-
- /**
- * Creates an interval of the given number of hours.
- *
- * @return interval of milliseconds
- */
- def hour: Expression = toMilliInterval(expr, MILLIS_PER_HOUR)
-
- /**
- * Creates an interval of the given number of hours.
- *
- * @return interval of milliseconds
- */
- def hours: Expression = hour
-
- /**
- * Creates an interval of the given number of minutes.
- *
- * @return interval of milliseconds
- */
- def minute: Expression = toMilliInterval(expr, MILLIS_PER_MINUTE)
-
- /**
- * Creates an interval of the given number of minutes.
- *
- * @return interval of milliseconds
- */
- def minutes: Expression = minute
-
- /**
- * Creates an interval of the given number of seconds.
- *
- * @return interval of milliseconds
- */
- def second: Expression = toMilliInterval(expr, MILLIS_PER_SECOND)
-
- /**
- * Creates an interval of the given number of seconds.
- *
- * @return interval of milliseconds
- */
- def seconds: Expression = second
-
- /**
- * Creates an interval of the given number of milliseconds.
- *
- * @return interval of milliseconds
- */
- def milli: Expression = toMilliInterval(expr, 1)
-
- /**
- * Creates an interval of the given number of milliseconds.
- *
- * @return interval of milliseconds
- */
- def millis: Expression = milli
-
// Row interval type
/**
@@ -879,121 +187,6 @@ trait ImplicitExpressionOperations {
*/
def rows: Expression = toRowInterval(expr)
- // Advanced type helper functions
-
- /**
- * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
- * returns it's value.
- *
- * @param name name of the field (similar to Flink's field expressions)
- * @return value of the field
- */
- def get(name: String): Expression = unresolvedCall(GET, expr, valueLiteral(name))
-
- /**
- * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
- * returns it's value.
- *
- * @param index position of the field
- * @return value of the field
- */
- def get(index: Int): Expression = unresolvedCall(GET, expr, valueLiteral(index))
-
- /**
- * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
- * into a flat representation where every subtype is a separate field.
- */
- def flatten(): Expression = unresolvedCall(FLATTEN, expr)
-
- /**
- * Accesses the element of an array or map based on a key or an index (starting at 1).
- *
- * @param index key or position of the element (array index starting at 1)
- * @return value of the element
- */
- def at(index: Expression): Expression = unresolvedCall(AT, expr, index)
-
- /**
- * Returns the number of elements of an array or number of entries of a map.
- *
- * @return number of elements or entries
- */
- def cardinality(): Expression = unresolvedCall(CARDINALITY, expr)
-
- /**
- * Returns the sole element of an array with a single element. Returns null if the array is
- * empty. Throws an exception if the array has more than one element.
- *
- * @return the first and only element of an array with a single element
- */
- def element(): Expression = unresolvedCall(ARRAY_ELEMENT, expr)
-
- // Time definition
-
- /**
- * Declares a field as the rowtime attribute for indicating, accessing, and working in
- * Flink's event time.
- */
- def rowtime: Expression = unresolvedCall(ROWTIME, expr)
-
- /**
- * Declares a field as the proctime attribute for indicating, accessing, and working in
- * Flink's processing time.
- */
- def proctime: Expression = unresolvedCall(PROCTIME, expr)
-
- // Hash functions
-
- /**
- * Returns the MD5 hash of the string argument; null if string is null.
- *
- * @return string of 32 hexadecimal digits or null
- */
- def md5(): Expression = unresolvedCall(MD5, expr)
-
- /**
- * Returns the SHA-1 hash of the string argument; null if string is null.
- *
- * @return string of 40 hexadecimal digits or null
- */
- def sha1(): Expression = unresolvedCall(SHA1, expr)
-
- /**
- * Returns the SHA-224 hash of the string argument; null if string is null.
- *
- * @return string of 56 hexadecimal digits or null
- */
- def sha224(): Expression = unresolvedCall(SHA224, expr)
-
- /**
- * Returns the SHA-256 hash of the string argument; null if string is null.
- *
- * @return string of 64 hexadecimal digits or null
- */
- def sha256(): Expression = unresolvedCall(SHA256, expr)
-
- /**
- * Returns the SHA-384 hash of the string argument; null if string is null.
- *
- * @return string of 96 hexadecimal digits or null
- */
- def sha384(): Expression = unresolvedCall(SHA384, expr)
-
- /**
- * Returns the SHA-512 hash of the string argument; null if string is null.
- *
- * @return string of 128 hexadecimal digits or null
- */
- def sha512(): Expression = unresolvedCall(SHA512, expr)
-
- /**
- * Returns the hash for the given string expression using the SHA-2 family of hash
- * functions (SHA-224, SHA-256, SHA-384, or SHA-512).
- *
- * @param hashLength bit length of the result (either 224, 256, 384, or 512)
- * @return string or null if one of the arguments is null.
- */
- def sha2(hashLength: Expression): Expression = unresolvedCall(SHA2, expr, hashLength)
}
/**
@@ -1109,7 +302,9 @@ trait ImplicitExpressionConversions {
* Calls a scalar function for the given parameters.
*/
def apply(params: Expression*): Expression = {
- unresolvedCall(new ScalarFunctionDefinition(s.getClass.getName, s), params:_*)
+ unresolvedCall(
+ new ScalarFunctionDefinition(s.getClass.getName, s),
+ params.map(ApiExpressionUtils.objectToExpression): _*)
}
}
@@ -1121,7 +316,9 @@ trait ImplicitExpressionConversions {
def apply(params: Expression*): Expression = {
val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper
.getReturnTypeOfTableFunction(t, implicitly[TypeInformation[T]])
- unresolvedCall(new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo), params: _*)
+ unresolvedCall(
+ new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo),
+ params.map(ApiExpressionUtils.objectToExpression): _*)
}
}
@@ -1149,7 +346,9 @@ trait ImplicitExpressionConversions {
* Calls an aggregate function for the given parameters.
*/
def apply(params: Expression*): Expression = {
- unresolvedCall(createFunctionDefinition(), params: _*)
+ unresolvedCall(
+ createFunctionDefinition(),
+ params.map(ApiExpressionUtils.objectToExpression): _*)
}
/**
@@ -1160,6 +359,25 @@ trait ImplicitExpressionConversions {
}
}
+
+ /**
+ * Extends Scala's StringContext with a method for creating an unresolved reference via
+ * string interpolation.
+ */
+ implicit class FieldExpression(val sc: StringContext) {
+
+ /**
+ * Creates an unresolved reference to a table's field.
+ *
+ * Example:
+ * ```
+ * tab.select($"key", $"value")
+ * ```
+ * </pre>
+ */
+ def $(args: Any*): Expression = unresolvedRef(sc.s(args: _*))
+ }
+
implicit def tableSymbolToExpression(sym: TableSymbol): Expression =
valueLiteral(sym)
@@ -1169,7 +387,7 @@ trait ImplicitExpressionConversions {
implicit def scalaRange2RangeExpression(range: Range.Inclusive): Expression = {
val startExpression = valueLiteral(range.start)
val endExpression = valueLiteral(range.end)
- startExpression to endExpression
+ unresolvedCall(RANGE_TO, startExpression, endExpression)
}
implicit def byte2Literal(b: Byte): Expression = valueLiteral(b)
@@ -1284,47 +502,77 @@ trait ImplicitExpressionConversions {
* @see TableEnvironment#createTemporaryFunction
* @see TableEnvironment#createTemporarySystemFunction
*/
- def call(path: String, params: Expression*): Expression = {
- lookupCall(path, params: _*)
- }
+ def call(path: String, params: Expression*): Expression = Expressions.call(path, params: _*)
// ----------------------------------------------------------------------------------------------
// Implicit expressions in prefix notation
// ----------------------------------------------------------------------------------------------
/**
+ * Creates a SQL literal.
+ *
+ * The data type is derived from the object's class and its value.
+ *
+ * For example:
+ *
+ * - `lit(12)`` leads to `INT`
+ * - `lit("abc")`` leads to `CHAR(3)`
+ * - `lit(new java.math.BigDecimal("123.45"))` leads to `DECIMAL(5, 2)`
+ *
+ * See [[org.apache.flink.table.types.utils.ValueDataTypeConverter]] for a list of supported
+ * literal values.
+ */
+ def lit(v: Any): Expression = Expressions.lit(v)
+
+ /**
+ * Creates a SQL literal of a given [[DataType]].
+ *
+ * The method [[lit(Object)]] is preferred as it extracts the [[DataType]]
+ * automatically. The class of `v` must be supported according to the
+ * [[org.apache.flink.table.types.logical.LogicalType#supportsInputConversion(Class)]].
+ */
+ def lit(v: Any, dataType: DataType): Expression = Expressions.lit(v, dataType)
+
+ /**
+ * Returns negative numeric.
+ */
+ def negative(v: Expression): Expression = {
+ Expressions.negative(v)
+ }
+
+ /**
* Returns the current SQL date in UTC time zone.
*/
def currentDate(): Expression = {
- unresolvedCall(CURRENT_DATE)
+ Expressions.currentDate()
}
/**
* Returns the current SQL time in UTC time zone.
*/
def currentTime(): Expression = {
- unresolvedCall(CURRENT_TIME)
+ Expressions.currentTime()
}
/**
* Returns the current SQL timestamp in UTC time zone.
*/
def currentTimestamp(): Expression = {
- unresolvedCall(CURRENT_TIMESTAMP)
+ Expressions.currentTimestamp()
}
/**
* Returns the current SQL time in local time zone.
*/
def localTime(): Expression = {
- unresolvedCall(LOCAL_TIME)
+ Expressions.localTime()
}
/**
* Returns the current SQL timestamp in local time zone.
*/
def localTimestamp(): Expression = {
- unresolvedCall(LOCAL_TIMESTAMP)
+ Expressions.localTimestamp()
}
/**
@@ -1342,7 +590,7 @@ trait ImplicitExpressionConversions {
rightTimePoint: Expression,
rightTemporal: Expression)
: Expression = {
- unresolvedCall(TEMPORAL_OVERLAPS, leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
+ Expressions.temporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
}
/**
@@ -1360,7 +608,7 @@ trait ImplicitExpressionConversions {
timestamp: Expression,
format: Expression)
: Expression = {
- unresolvedCall(DATE_FORMAT, timestamp, format)
+ Expressions.dateFormat(timestamp, format)
}
/**
@@ -1379,49 +627,49 @@ trait ImplicitExpressionConversions {
timePoint1: Expression,
timePoint2: Expression)
: Expression = {
- unresolvedCall(TIMESTAMP_DIFF, timePointUnit, timePoint1, timePoint2)
+ Expressions.timestampDiff(timePointUnit, timePoint1, timePoint2)
}
/**
* Creates an array of literals.
*/
def array(head: Expression, tail: Expression*): Expression = {
- unresolvedCall(ARRAY, head +: tail: _*)
+ Expressions.array(head, tail: _*)
}
/**
* Creates a row of expressions.
*/
def row(head: Expression, tail: Expression*): Expression = {
- unresolvedCall(ROW, head +: tail: _*)
+ Expressions.row(head, tail: _*)
}
/**
* Creates a map of expressions.
*/
def map(key: Expression, value: Expression, tail: Expression*): Expression = {
- unresolvedCall(MAP, key +: value +: tail: _*)
+ Expressions.map(key, value, tail: _*)
}
/**
* Returns a value that is closer than any other value to pi.
*/
def pi(): Expression = {
- unresolvedCall(PI)
+ Expressions.pi()
}
/**
* Returns a value that is closer than any other value to e.
*/
def e(): Expression = {
- unresolvedCall(E)
+ Expressions.e()
}
/**
* Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
*/
def rand(): Expression = {
- unresolvedCall(RAND)
+ Expressions.rand()
}
/**
@@ -1430,7 +678,7 @@ trait ImplicitExpressionConversions {
* have same initial seed.
*/
def rand(seed: Expression): Expression = {
- unresolvedCall(RAND, seed)
+ Expressions.rand(seed)
}
/**
@@ -1438,7 +686,7 @@ trait ImplicitExpressionConversions {
* value (exclusive).
*/
def randInteger(bound: Expression): Expression = {
- unresolvedCall(RAND_INTEGER, bound)
+ Expressions.randInteger(bound)
}
/**
@@ -1447,7 +695,7 @@ trait ImplicitExpressionConversions {
* of numbers if they have same initial seed and same bound.
*/
def randInteger(seed: Expression, bound: Expression): Expression = {
- unresolvedCall(RAND_INTEGER, seed, bound)
+ Expressions.randInteger(seed, bound)
}
/**
@@ -1455,25 +703,38 @@ trait ImplicitExpressionConversions {
* Returns NULL if any argument is NULL.
*/
def concat(string: Expression, strings: Expression*): Expression = {
- unresolvedCall(CONCAT, string +: strings: _*)
+ Expressions.concat(string, strings: _*)
}
/**
* Calculates the arc tangent of a given coordinate.
*/
def atan2(y: Expression, x: Expression): Expression = {
- unresolvedCall(ATAN2, y, x)
+ Expressions.atan2(y, x)
}
/**
* Returns the string that results from concatenating the arguments and separator.
* Returns NULL If the separator is NULL.
*
- * Note: this user-defined function does not skip empty strings. However, it does skip any NULL
+ * Note: This function does not skip empty strings. However, it does skip any NULL
* values after the separator argument.
+ * @deprecated use [[ImplicitExpressionConversions.concatWs()]]
**/
+ @deprecated
def concat_ws(separator: Expression, string: Expression, strings: Expression*): Expression = {
- unresolvedCall(CONCAT_WS, separator +: string +: strings: _*)
+ concatWs(separator, string, strings: _*)
+ }
+
+ /**
+ * Returns the string that results from concatenating the arguments and separator.
+ * Returns NULL If the separator is NULL.
+ *
+ * Note: this user-defined function does not skip empty strings. However, it does skip any NULL
+ * values after the separator argument.
+ **/
+ def concatWs(separator: Expression, string: Expression, strings: Expression*): Expression = {
+ Expressions.concatWs(separator, string, strings: _*)
}
/**
@@ -1483,7 +744,7 @@ trait ImplicitExpressionConversions {
* generator.
*/
def uuid(): Expression = {
- unresolvedCall(UUID)
+ Expressions.uuid()
}
/**
@@ -1492,7 +753,7 @@ trait ImplicitExpressionConversions {
* e.g. nullOf(DataTypes.INT())
*/
def nullOf(dataType: DataType): Expression = {
- valueLiteral(null, dataType)
+ Expressions.nullOf(dataType)
}
/**
@@ -1503,21 +764,21 @@ trait ImplicitExpressionConversions {
* documentation for more information.
*/
def nullOf(typeInfo: TypeInformation[_]): Expression = {
- nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo))
+ Expressions.nullOf(typeInfo)
}
/**
* Calculates the logarithm of the given value.
*/
def log(value: Expression): Expression = {
- unresolvedCall(LOG, value)
+ Expressions.log(value)
}
/**
* Calculates the logarithm of the given value to the given base.
*/
def log(base: Expression, value: Expression): Expression = {
- unresolvedCall(LOG, base, value)
+ Expressions.log(base, value)
}
/**
@@ -1531,7 +792,7 @@ trait ImplicitExpressionConversions {
* @param ifFalse expression to be evaluated if condition does not hold
*/
def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression = {
- unresolvedCall(IF, condition, ifTrue, ifFalse)
+ Expressions.ifThenElse(condition, ifTrue, ifFalse)
}
/**
@@ -1544,7 +805,7 @@ trait ImplicitExpressionConversions {
* e.g. withColumns('b to 'c) or withColumns('*)
*/
def withColumns(head: Expression, tail: Expression*): Expression = {
- unresolvedCall(WITH_COLUMNS, head +: tail: _*)
+ Expressions.withColumns(head, tail: _*)
}
/**
@@ -1558,6 +819,20 @@ trait ImplicitExpressionConversions {
* e.g. withoutColumns('b to 'c) or withoutColumns('c)
*/
def withoutColumns(head: Expression, tail: Expression*): Expression = {
- unresolvedCall(WITHOUT_COLUMNS, head +: tail: _*)
+ Expressions.withoutColumns(head, tail: _*)
+ }
+
+ /**
+ * Boolean AND in three-valued logic.
+ */
+ def and(predicate0: Expression, predicate1: Expression, predicates: Expression*): Expression = {
+ Expressions.and(predicate0, predicate1, predicates: _*)
+ }
+
+ /**
+ * Boolean OR in three-valued logic.
+ */
+ def or(predicate0: Expression, predicate1: Expression, predicates: Expression*): Expression = {
+ Expressions.or(predicate0, predicate1, predicates: _*)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 06a9b5f..fadf854 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.functions._
import org.apache.flink.table.planner.expressions.{E => PlannerE, UUID => PlannerUUID}
import org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THROW_EXCEPTION
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE}
+import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL}
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
import _root_.scala.collection.JavaConverters._
@@ -151,12 +151,12 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
expr
case AND =>
- assert(args.size == 2)
- And(args.head, args.last)
+ assert(args.size >= 2)
+ args.reduceLeft(And)
case OR =>
- assert(args.size == 2)
- Or(args.head, args.last)
+ assert(args.size >= 2)
+ args.reduceLeft(Or)
case NOT =>
assert(args.size == 1)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index cb9f6b1..7b2c999 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -133,12 +133,12 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
expr
case AND =>
- assert(args.size == 2)
- And(args.head, args.last)
+ assert(args.size >= 2)
+ args.reduceLeft(And)
case OR =>
- assert(args.size == 2)
- Or(args.head, args.last)
+ assert(args.size >= 2)
+ args.reduceLeft(Or)
case NOT =>
assert(args.size == 1)