You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/03/01 16:40:41 UTC

[flink] 01/04: [FLINK-16033][table-api] Introduced Java Table API Expression DSL

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 029d578e098e2b02281d5c320da55c06e239b2b9
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Feb 11 08:25:12 2020 +0100

    [FLINK-16033][table-api] Introduced Java Table API Expression DSL
---
 .../org/apache/flink/table/api/ApiExpression.java  |   66 +
 .../org/apache/flink/table/api/Expressions.java    |  549 +++++++++
 .../org/apache/flink/table/api/GroupWindow.java    |    5 +-
 .../org/apache/flink/table/api/SessionWithGap.java |    3 +-
 .../flink/table/api/SessionWithGapOnTime.java      |    5 +-
 .../table/api/SessionWithGapOnTimeWithAlias.java   |    3 +-
 .../org/apache/flink/table/api/SlideWithSize.java  |    3 +-
 .../flink/table/api/SlideWithSizeAndSlide.java     |    5 +-
 .../table/api/SlideWithSizeAndSlideOnTime.java     |   13 +-
 .../api/SlideWithSizeAndSlideOnTimeWithAlias.java  |   13 +-
 .../org/apache/flink/table/api/TumbleWithSize.java |    3 +-
 .../flink/table/api/TumbleWithSizeOnTime.java      |    5 +-
 .../table/api/TumbleWithSizeOnTimeWithAlias.java   |    3 +-
 .../flink/table/api/internal/BaseExpressions.java  | 1284 ++++++++++++++++++++
 .../apache/flink/table/api/internal/TableImpl.java |   37 +-
 .../table/expressions/ApiExpressionUtils.java      |   39 +-
 .../table/expressions/LookupCallExpression.java    |    3 +-
 .../expressions/UnresolvedCallExpression.java      |    7 +-
 .../expressions/resolver/ExpressionResolver.java   |    2 +
 .../expressions/resolver/LookupCallResolver.java   |   12 +
 .../resolver/rules/OverWindowResolverRule.java     |    2 +-
 .../expressions/resolver/rules/ResolverRules.java  |    6 +
 .../resolver/rules/UnwrapApiExpressionRule.java}   |   30 +-
 .../flink/table/typeutils/FieldInfoUtils.java      |   14 +-
 .../org/apache/flink/table/api/expressionDsl.scala | 1045 +++-------------
 .../expressions/PlannerExpressionConverter.scala   |   10 +-
 .../expressions/PlannerExpressionConverter.scala   |    8 +-
 27 files changed, 2205 insertions(+), 970 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java
new file mode 100644
index 0000000..0e2027d
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.table.api.internal.BaseExpressions;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+
+import java.util.List;
+
+/**
+ * Java API class that gives access to expression operations.
+ *
+ * @see BaseExpressions
+ */
+public final class ApiExpression extends BaseExpressions<Object, ApiExpression> implements Expression {
+	private final Expression wrappedExpression;
+
+	@Override
+	public String asSummaryString() {
+		return wrappedExpression.asSummaryString();
+	}
+
+	ApiExpression(Expression wrappedExpression) {
+		if (wrappedExpression instanceof ApiExpression) {
+			throw new UnsupportedOperationException("This is a bug. Please file an issue.");
+		}
+		this.wrappedExpression = wrappedExpression;
+	}
+
+	@Override
+	public Expression toExpr() {
+		return wrappedExpression;
+	}
+
+	@Override
+	protected ApiExpression toApiSpecificExpression(Expression expression) {
+		return new ApiExpression(expression);
+	}
+
+	@Override
+	public List<Expression> getChildren() {
+		return wrappedExpression.getChildren();
+	}
+
+	@Override
+	public <R> R accept(ExpressionVisitor<R> visitor) {
+		return wrappedExpression.accept(visitor);
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
new file mode 100644
index 0000000..9e0c10e
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TimePointUnit;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.types.utils.ValueDataTypeConverter;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
+
+/**
+ * Entry point of the Table API Expression DSL such as: {@code $("myField").plus(10).abs()}
+ *
+ * <p>This class contains static methods for referencing table columns, creating literals,
+ * and building more complex {@link Expression} chains. {@link ApiExpression ApiExpressions} are
+ * pure API entities that are further translated into {@link ResolvedExpression ResolvedExpressions}
+ * under the hood.
+ *
+ * <p>For fluent definition of expressions and easier readability, we recommend to add a
+ * star import to the methods of this class:
+ *
+ * <pre>
+ * import static org.apache.flink.table.api.Expressions.*;
+ * </pre>
+ *
+ * <p>Check the documentation for more programming language specific APIs, for example, by using
+ * Scala implicits.
+ */
+@PublicEvolving
+public final class Expressions {
+	/**
+	 * Creates an unresolved reference to a table's field.
+	 *
+	 * <p>Example:
+	 * <pre>{@code
+	 *   tab.select($("key"), $("value"))
+	 * }
+	 * </pre>
+	 */
+	//CHECKSTYLE.OFF: MethodName
+	public static ApiExpression $(String name) {
+		return new ApiExpression(unresolvedRef(name));
+	}
+	//CHECKSTYLE.ON: MethodName
+
+	/**
+	 * Creates a SQL literal.
+	 *
+	 * <p>The data type is derived from the object's class and its value.
+	 *
+	 * <p>For example:
+	 * <ul>
+	 *     <li>{@code lit(12)} leads to {@code INT}</li>
+	 *     <li>{@code lit("abc")} leads to {@code CHAR(3)}</li>
+	 *     <li>{@code lit(new BigDecimal("123.45"))} leads to {@code DECIMAL(5, 2)}</li>
+	 * </ul>
+	 *
+	 * <p>See {@link ValueDataTypeConverter} for a list of supported literal values.
+	 */
+	public static ApiExpression lit(Object v) {
+		return new ApiExpression(valueLiteral(v));
+	}
+
+	/**
+	 * Creates a SQL literal of a given {@link DataType}.
+	 *
+	 * <p>The method {@link #lit(Object)} is preferred as it extracts the {@link DataType} automatically.
+	 * Use this method only when necessary. The class of {@code v} must be supported according to the
+	 * {@link org.apache.flink.table.types.logical.LogicalType#supportsInputConversion(Class)}.
+	 */
+	public static ApiExpression lit(Object v, DataType dataType) {
+		return new ApiExpression(valueLiteral(v, dataType));
+	}
+
+	/**
+	 * Indicates a range from 'start' to 'end', which can be used in columns
+	 * selection.
+	 *
+	 * <p>Example:
+	 * <pre>{@code
+	 * Table table = ...
+	 * table.select(withColumns(range(b, c)))
+	 * }</pre>
+	 *
+	 * @see #withColumns(Object, Object...)
+	 * @see #withoutColumns(Object, Object...)
+	 */
+	public static ApiExpression range(String start, String end) {
+		return apiCall(BuiltInFunctionDefinitions.RANGE_TO, unresolvedRef(start), unresolvedRef(end));
+	}
+
+	/**
+	 * Indicates an index based range, which can be used in columns selection.
+	 *
+	 * <p>Example:
+	 * <pre>{@code
+	 * Table table = ...
+	 * table.select(withColumns(range(3, 4)))
+	 * }</pre>
+	 *
+	 * @see #withColumns(Object, Object...)
+	 * @see #withoutColumns(Object, Object...)
+	 */
+	public static ApiExpression range(int start, int end) {
+		return apiCall(BuiltInFunctionDefinitions.RANGE_TO, valueLiteral(start), valueLiteral(end));
+	}
+
+	/**
+	 * Boolean AND in three-valued logic.
+	 */
+	public static ApiExpression and(Object predicate0, Object predicate1, Object... predicates) {
+		return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.AND, predicate0, predicate1, predicates);
+	}
+
+	/**
+	 * Boolean OR in three-valued logic.
+	 */
+	public static ApiExpression or(Object predicate0, Object predicate1, Object... predicates) {
+		return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.OR, predicate0, predicate1, predicates);
+	}
+
+	/**
+	 * Offset constant to be used in the {@code preceding} clause of unbounded {@code Over} windows. Use this
+	 * constant for a time interval. Unbounded over windows start with the first row of a partition.
+	 */
+	public static final ApiExpression UNBOUNDED_ROW = apiCall(BuiltInFunctionDefinitions.UNBOUNDED_ROW);
+
+	/**
+	 * Offset constant to be used in the {@code preceding} clause of unbounded {@link Over} windows. Use this
+	 * constant for a row-count interval. Unbounded over windows start with the first row of a
+	 * partition.
+	 */
+	public static final ApiExpression UNBOUNDED_RANGE = apiCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE);
+
+	/**
+	 * Offset constant to be used in the {@code following} clause of {@link Over} windows. Use this for setting
+	 * the upper bound of the window to the current row.
+	 */
+	public static final ApiExpression CURRENT_ROW = apiCall(BuiltInFunctionDefinitions.CURRENT_ROW);
+
+	/**
+	 * Offset constant to be used in the {@code following} clause of {@link Over} windows. Use this for setting
+	 * the upper bound of the window to the sort key of the current row, i.e., all rows with the same
+	 * sort key as the current row are included in the window.
+	 */
+	public static final ApiExpression CURRENT_RANGE = apiCall(BuiltInFunctionDefinitions.CURRENT_RANGE);
+
+	/**
+	 * Returns the current SQL date in UTC time zone.
+	 */
+	public static ApiExpression currentDate() {
+		return apiCall(BuiltInFunctionDefinitions.CURRENT_DATE);
+	}
+
+	/**
+	 * Returns the current SQL time in UTC time zone.
+	 */
+	public static ApiExpression currentTime() {
+		return apiCall(BuiltInFunctionDefinitions.CURRENT_TIME);
+	}
+
+	/**
+	 * Returns the current SQL timestamp in UTC time zone.
+	 */
+	public static ApiExpression currentTimestamp() {
+		return apiCall(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP);
+	}
+
+	/**
+	 * Returns the current SQL time in local time zone.
+	 */
+	public static ApiExpression localTime() {
+		return apiCall(BuiltInFunctionDefinitions.LOCAL_TIME);
+	}
+
+	/**
+	 * Returns the current SQL timestamp in local time zone.
+	 */
+	public static ApiExpression localTimestamp() {
+		return apiCall(BuiltInFunctionDefinitions.LOCAL_TIMESTAMP);
+	}
+
+	/**
+	 * Determines whether two anchored time intervals overlap. Time point and temporal are
+	 * transformed into a range defined by two time points (start, end). The function
+	 * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
+	 *
+	 * <p>It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
+	 *
+	 * <p>e.g.
+	 * <pre>{@code
+	 * temporalOverlaps(
+	 *      lit("2:55:00").toTime(),
+	 *      interval(Duration.ofHour(1)),
+	 *      lit("3:30:00").toTime(),
+	 *      interval(Duration.ofHour(2))
+	 * }</pre>
+	 * leads to true
+	 */
+	public static ApiExpression temporalOverlaps(
+			Object leftTimePoint,
+			Object leftTemporal,
+			Object rightTimePoint,
+			Object rightTemporal) {
+		return apiCall(
+			BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS,
+			leftTimePoint,
+			leftTemporal,
+			rightTimePoint,
+			rightTemporal);
+	}
+
+	/**
+	 * Formats a timestamp as a string using a specified format.
+	 * The format must be compatible with MySQL's date formatting syntax as used by the
+	 * date_parse function.
+	 *
+	 * <p>For example {@code dataFormat($("time"), "%Y, %d %M")} results in strings formatted as "2017, 05 May".
+	 *
+	 * @param timestamp The timestamp to format as string.
+	 * @param format The format of the string.
+	 * @return The formatted timestamp as string.
+	 */
+	public static ApiExpression dateFormat(
+			Object timestamp,
+			Object format) {
+		return apiCall(BuiltInFunctionDefinitions.DATE_FORMAT, timestamp, format);
+	}
+
+	/**
+	 * Returns the (signed) number of {@link TimePointUnit} between timePoint1 and timePoint2.
+	 *
+	 * <p>For example, {@code timestampDiff(TimePointUnit.DAY, lit("2016-06-15").toDate(), lit("2016-06-18").toDate()}
+	 * leads to 3.
+	 *
+	 * @param timePointUnit The unit to compute diff.
+	 * @param timePoint1 The first point in time.
+	 * @param timePoint2 The second point in time.
+	 * @return The number of intervals as integer value.
+	 */
+	public static ApiExpression timestampDiff(
+			TimePointUnit timePointUnit,
+			Object timePoint1,
+			Object timePoint2) {
+		return apiCall(BuiltInFunctionDefinitions.TIMESTAMP_DIFF, valueLiteral(timePointUnit), timePoint1, timePoint2);
+	}
+
+	/**
+	 * Creates an array of literals.
+	 */
+	public static ApiExpression array(Object head, Object... tail) {
+		return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ARRAY, head, tail);
+	}
+
+	/**
+	 * Creates a row of expressions.
+	 */
+	public static ApiExpression row(Object head, Object... tail) {
+		return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ROW, head, tail);
+	}
+
+	/**
+	 * Creates a map of expressions.
+	 *
+	 * <pre>{@code
+	 *  table.select(
+	 *      map(
+	 *          "key1", 1,
+	 *          "key2", 2,
+	 *          "key3", 3
+	 *      ))
+	 * }</pre>
+	 *
+	 * <p>Note keys and values should have the same types for all entries.
+	 */
+	public static ApiExpression map(Object key, Object value, Object... tail) {
+		return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.MAP, key, value, tail);
+	}
+
+	/**
+	 * Creates an interval of rows.
+	 *
+	 * @see Table#window(GroupWindow)
+	 * @see Table#window(OverWindow...)
+	 */
+	public static ApiExpression rowInterval(Long rows) {
+		return new ApiExpression(valueLiteral(rows));
+	}
+
+	/**
+	 * Returns a value that is closer than any other value to pi.
+	 */
+	public static ApiExpression pi() {
+		return apiCall(BuiltInFunctionDefinitions.PI);
+	}
+
+	/**
+	 * Returns a value that is closer than any other value to e.
+	 */
+	public static ApiExpression e() {
+		return apiCall(BuiltInFunctionDefinitions.E);
+	}
+
+	/**
+	 * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
+	 */
+	public static ApiExpression rand() {
+		return apiCall(BuiltInFunctionDefinitions.RAND);
+	}
+
+	/**
+	 * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a
+	 * initial seed. Two rand() functions will return identical sequences of numbers if they
+	 * have same initial seed.
+	 */
+	public static ApiExpression rand(Object seed) {
+		return apiCall(BuiltInFunctionDefinitions.RAND, objectToExpression(seed));
+	}
+
+	/**
+	 * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified
+	 * value (exclusive).
+	 */
+	public static ApiExpression randInteger(Object bound) {
+		return apiCall(BuiltInFunctionDefinitions.RAND_INTEGER, objectToExpression(bound));
+	}
+
+	/**
+	 * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value
+	 * (exclusive) with a initial seed. Two randInteger() functions will return identical sequences
+	 * of numbers if they have same initial seed and same bound.
+	 */
+	public static ApiExpression randInteger(Object seed, Object bound) {
+		return apiCall(BuiltInFunctionDefinitions.RAND_INTEGER, objectToExpression(seed), objectToExpression(bound));
+	}
+
+	/**
+	 * Returns the string that results from concatenating the arguments.
+	 * Returns NULL if any argument is NULL.
+	 */
+	public static ApiExpression concat(Object string, Object... strings) {
+		return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.CONCAT, string, strings);
+	}
+
+	/**
+	 * Calculates the arc tangent of a given coordinate.
+	 */
+	public static ApiExpression atan2(Object y, Object x) {
+		return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ATAN2, y, x);
+	}
+
+	/**
+	 * Returns negative numeric.
+	 */
+	public static ApiExpression negative(Object v) {
+		return apiCall(BuiltInFunctionDefinitions.MINUS_PREFIX, v);
+	}
+
+	/**
+	 * Returns the string that results from concatenating the arguments and separator.
+	 * Returns NULL If the separator is NULL.
+	 *
+	 * <p>Note: this function does not skip empty strings. However, it does skip any NULL
+	 * values after the separator argument.
+	 */
+	public static ApiExpression concatWs(Object separator, Object string, Object... strings) {
+		return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.CONCAT_WS, separator, string, strings);
+	}
+
+	/**
+	 * Returns an UUID (Universally Unique Identifier) string (e.g.,
+	 * "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly
+	 * generated) UUID. The UUID is generated using a cryptographically strong pseudo random number
+	 * generator.
+	 */
+	public static ApiExpression uuid() {
+		return apiCall(BuiltInFunctionDefinitions.UUID);
+	}
+
+	/**
+	 * Returns a null literal value of a given data type.
+	 *
+	 * <p>e.g. {@code nullOf(DataTypes.INT())}
+	 */
+	public static ApiExpression nullOf(DataType dataType) {
+		return new ApiExpression(valueLiteral(null, dataType));
+	}
+
+	/**
+	 * @deprecated This method will be removed in future versions as it uses the old type system.
+	 *             It is recommended to use {@link #nullOf(DataType)} instead which uses the new type
+	 *             system based on {@link DataTypes}. Please make sure to use either the old or the new
+	 *             type system consistently to avoid unintended behavior. See the website
+	 *             documentation for more information.
+	 */
+	public static ApiExpression nullOf(TypeInformation<?> typeInfo) {
+		return nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo));
+	}
+
+	/**
+	 * Calculates the logarithm of the given value.
+	 */
+	public static ApiExpression log(Object value) {
+		return apiCall(BuiltInFunctionDefinitions.LOG, value);
+	}
+
+	/**
+	 * Calculates the logarithm of the given value to the given base.
+	 */
+	public static ApiExpression log(Object base, Object value) {
+		return apiCall(BuiltInFunctionDefinitions.LOG, base, value);
+	}
+
+	/**
+	 * Ternary conditional operator that decides which of two other expressions should be evaluated
+	 * based on a evaluated boolean condition.
+	 *
+	 * <p>e.g. ifThenElse($("f0") > 5, "A", "B") leads to "A"
+	 *
+	 * @param condition boolean condition
+	 * @param ifTrue expression to be evaluated if condition holds
+	 * @param ifFalse expression to be evaluated if condition does not hold
+	 */
+	public static ApiExpression ifThenElse(Object condition, Object ifTrue, Object ifFalse) {
+		return apiCall(BuiltInFunctionDefinitions.IF, condition, ifTrue, ifFalse);
+	}
+
+	/**
+	 * Creates an expression that selects a range of columns. It can be used wherever an array of
+	 * expression is accepted such as function calls, projections, or groupings.
+	 *
+	 * <p>A range can either be index-based or name-based. Indices start at 1 and boundaries are
+	 * inclusive.
+	 *
+	 * <p>e.g. withColumns(range("b", "c")) or withoutColumns($("*"))
+	 */
+	public static ApiExpression withColumns(Object head, Object... tail) {
+		return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITH_COLUMNS, head, tail);
+	}
+
+	/**
+	 * Creates an expression that selects all columns except for the given range of columns. It can
+	 * be used wherever an array of expression is accepted such as function calls, projections, or
+	 * groupings.
+	 *
+	 * <p>A range can either be index-based or name-based. Indices start at 1 and boundaries are
+	 * inclusive.
+	 *
+	 * <p>e.g. withoutColumns(range("b", "c")) or withoutColumns($("c"))
+	 */
+	public static ApiExpression withoutColumns(Object head, Object... tail) {
+		return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITHOUT_COLUMNS, head, tail);
+	}
+
+	/**
+	 * A call to a function that will be looked up in a catalog. There are two kinds of functions:
+	 * <ul>
+	 *     <li>System functions - which are identified with one part names</li>
+	 *     <li>Catalog functions - which are identified always with three parts names
+	 *     (catalog, database, function)</li>
+	 * </ul>
+	 *
+	 * <p>Moreover each function can either be a temporary function or permanent one
+	 * (which is stored in an external catalog).
+	 *
+	 * <p>Based on that two properties the resolution order for looking up a function based on
+	 * the provided {@code functionName} is following:
+	 * <ul>
+	 *     <li>Temporary system function</li>
+	 *     <li>System function</li>
+	 *     <li>Temporary catalog function</li>
+	 *     <li>Catalog function</li>
+	 * </ul>
+	 *
+	 * @see TableEnvironment#useCatalog(String)
+	 * @see TableEnvironment#useDatabase(String)
+	 * @see TableEnvironment#createTemporaryFunction
+	 * @see TableEnvironment#createTemporarySystemFunction
+	 */
+	public static ApiExpression call(String path, Object... params) {
+		return new ApiExpression(ApiExpressionUtils.lookupCall(
+			path,
+			Arrays.stream(params).map(ApiExpressionUtils::objectToExpression).toArray(Expression[]::new)));
+	}
+
+	private static ApiExpression apiCall(FunctionDefinition functionDefinition, Object... args) {
+		List<Expression> arguments =
+			Stream.of(args)
+				.map(ApiExpressionUtils::objectToExpression)
+				.collect(Collectors.toList());
+		return new ApiExpression(unresolvedCall(functionDefinition, arguments));
+	}
+
+	private static ApiExpression apiCallAtLeastOneArgument(FunctionDefinition functionDefinition,
+			Object arg0,
+			Object... args) {
+		List<Expression> arguments = Stream.concat(
+			Stream.of(arg0),
+			Stream.of(args)
+		).map(ApiExpressionUtils::objectToExpression)
+			.collect(Collectors.toList());
+		return new ApiExpression(unresolvedCall(functionDefinition, arguments));
+	}
+
+	private static ApiExpression apiCallAtLeastTwoArgument(
+			FunctionDefinition functionDefinition,
+			Object arg0,
+			Object arg1,
+			Object... args) {
+		List<Expression> arguments = Stream.concat(
+			Stream.of(arg0, arg1),
+			Stream.of(args)
+		).map(ApiExpressionUtils::objectToExpression)
+			.collect(Collectors.toList());
+		return new ApiExpression(unresolvedCall(functionDefinition, arguments));
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
index 4601e75..36cd855 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 
 /**
@@ -41,8 +42,8 @@ public abstract class GroupWindow {
 	private final Expression timeField;
 
 	GroupWindow(Expression alias, Expression timeField) {
-		this.alias = alias;
-		this.timeField = timeField;
+		this.alias = ApiExpressionUtils.unwrapFromApi(alias);
+		this.timeField = ApiExpressionUtils.unwrapFromApi(timeField);
 	}
 
 	public Expression getAlias() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
index 6e646f7..0d9ee6d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -36,7 +37,7 @@ public final class SessionWithGap {
 	private final Expression gap;
 
 	SessionWithGap(Expression gap) {
-		this.gap = gap;
+		this.gap = ApiExpressionUtils.unwrapFromApi(gap);
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
index 53c9c4f..ef63dde 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -32,8 +33,8 @@ public final class SessionWithGapOnTime {
 	private final Expression gap;
 
 	SessionWithGapOnTime(Expression timeField, Expression gap) {
-		this.timeField = timeField;
-		this.gap = gap;
+		this.timeField = ApiExpressionUtils.unwrapFromApi(timeField);
+		this.gap = ApiExpressionUtils.unwrapFromApi(gap);
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
index 891ea7c..b88a93e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 
 /**
@@ -31,7 +32,7 @@ public final class SessionWithGapOnTimeWithAlias extends GroupWindow {
 
 	SessionWithGapOnTimeWithAlias(Expression alias, Expression timeField, Expression gap) {
 		super(alias, timeField);
-		this.gap = gap;
+		this.gap = ApiExpressionUtils.unwrapFromApi(gap);
 	}
 
 	public Expression getGap() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
index 44470f9..983d07d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -32,7 +33,7 @@ public final class SlideWithSize {
 	private final Expression size;
 
 	SlideWithSize(Expression size) {
-		this.size = size;
+		this.size = ApiExpressionUtils.unwrapFromApi(size);
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
index 4d509d1..d4d83c1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -37,8 +38,8 @@ public final class SlideWithSizeAndSlide {
 	private final Expression slide;
 
 	SlideWithSizeAndSlide(Expression size, Expression slide) {
-		this.size = size;
-		this.slide = slide;
+		this.size = ApiExpressionUtils.unwrapFromApi(size);
+		this.slide = ApiExpressionUtils.unwrapFromApi(slide);
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
index 604b9cd..c74529a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -33,12 +34,12 @@ public final class SlideWithSizeAndSlideOnTime {
 	private final Expression slide;
 
 	SlideWithSizeAndSlideOnTime(
-		Expression timeField,
-		Expression size,
-		Expression slide) {
-		this.timeField = timeField;
-		this.size = size;
-		this.slide = slide;
+			Expression timeField,
+			Expression size,
+			Expression slide) {
+		this.timeField = ApiExpressionUtils.unwrapFromApi(timeField);
+		this.size = ApiExpressionUtils.unwrapFromApi(size);
+		this.slide = ApiExpressionUtils.unwrapFromApi(slide);
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
index 50b3692..8b9c692 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 
 /**
@@ -31,13 +32,13 @@ public final class SlideWithSizeAndSlideOnTimeWithAlias extends GroupWindow {
 	private final Expression slide;
 
 	SlideWithSizeAndSlideOnTimeWithAlias(
-		Expression alias,
-		Expression timeField,
-		Expression size,
-		Expression slide) {
+			Expression alias,
+			Expression timeField,
+			Expression size,
+			Expression slide) {
 		super(alias, timeField);
-		this.size = size;
-		this.slide = slide;
+		this.size = ApiExpressionUtils.unwrapFromApi(size);
+		this.slide = ApiExpressionUtils.unwrapFromApi(slide);
 	}
 
 	public Expression getSize() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
index 600e3a1..bbc1825 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -36,7 +37,7 @@ public final class TumbleWithSize {
 	private Expression size;
 
 	TumbleWithSize(Expression size) {
-		this.size = size;
+		this.size = ApiExpressionUtils.unwrapFromApi(size);
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
index f635ba5..07616d1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -32,8 +33,8 @@ public final class TumbleWithSizeOnTime {
 	private final Expression size;
 
 	TumbleWithSizeOnTime(Expression time, Expression size) {
-		this.time = time;
-		this.size = size;
+		this.time = ApiExpressionUtils.unwrapFromApi(time);
+		this.size = ApiExpressionUtils.unwrapFromApi(size);
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
index 5180d33..4e25a4f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 
 /**
@@ -31,7 +32,7 @@ public final class TumbleWithSizeOnTimeWithAlias extends GroupWindow {
 
 	TumbleWithSizeOnTimeWithAlias(Expression alias, Expression timeField, Expression size) {
 		super(alias, timeField);
-		this.size = size;
+		this.size = ApiExpressionUtils.unwrapFromApi(size);
 	}
 
 	public Expression getSize() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
new file mode 100644
index 0000000..41cb93c
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -0,0 +1,1284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_DAY;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_HOUR;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_MINUTE;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_SECOND;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.tableRef;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.toMilliInterval;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.toMonthInterval;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
+//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.*;
+//CHECKSTYLE.ON: AvoidStarImport|ImportOrder
+
+/**
+ * These are Java and Scala common operations that can be used to construct an {@link Expression} AST for
+ * expression operations.
+ *
+ * @param <InType>  The accepted type of input expressions, it is {@code Expression} for Scala and
+ *                  {@code Object} for Java. Generally the expression DSL works on expressions, the
+ *                  reason why Java accepts Object is to remove cumbersome call to {@code lit()} for
+ *                  literals. Scala alleviates this problem via implicit conversions.
+ * @param <OutType> The produced type of the DSL. It is {@code ApiExpression} for Java and {@code Expression}
+ *                  for Scala. In Scala the infix operations are included via implicit conversions. In Java
+ *                  we introduced a wrapper that enables the operations without pulling them through the whole stack.
+ */
+@PublicEvolving
+public abstract class BaseExpressions<InType, OutType> {
+	protected abstract Expression toExpr();
+
+	protected abstract OutType toApiSpecificExpression(Expression expression);
+
+	/**
+	 * Specifies a name for an expression i.e. a field.
+	 *
+	 * @param name       name for one field
+	 * @param extraNames additional names if the expression expands to multiple fields
+	 */
+	public OutType as(String name, String... extraNames) {
+		return toApiSpecificExpression(ApiExpressionUtils.unresolvedCall(
+			BuiltInFunctionDefinitions.AS,
+			Stream.concat(
+				Stream.of(toExpr(), ApiExpressionUtils.valueLiteral(name)),
+				Stream.of(extraNames).map(ApiExpressionUtils::valueLiteral)
+			).toArray(Expression[]::new)));
+	}
+
+	/**
+	 * Boolean AND in three-valued logic. This is an infix notation. See also
+	 * {@link Expressions#and(Object, Object, Object...)} for prefix notation with multiple arguments.
+	 *
+	 * @see Expressions#and(Object, Object, Object...)
+	 */
+	public OutType and(InType other) {
+		return toApiSpecificExpression(unresolvedCall(AND, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Boolean OR in three-valued logic. This is an infix notation. See also
+	 * {@link Expressions#or(Object, Object, Object...)} for prefix notation with multiple arguments.
+	 *
+	 * @see Expressions#or(Object, Object, Object...)
+	 */
+	public OutType or(InType other) {
+		return toApiSpecificExpression(unresolvedCall(OR, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Greater than.
+	 */
+	public OutType isGreater(InType other) {
+		return toApiSpecificExpression(unresolvedCall(GREATER_THAN, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Greater than or equal.
+	 */
+	public OutType isGreaterOrEqual(InType other) {
+		return toApiSpecificExpression(unresolvedCall(GREATER_THAN_OR_EQUAL, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Less than.
+	 */
+	public OutType isLess(InType other) {
+		return toApiSpecificExpression(unresolvedCall(LESS_THAN, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Less than or equal.
+	 */
+	public OutType isLessOrEqual(InType other) {
+		return toApiSpecificExpression(unresolvedCall(LESS_THAN_OR_EQUAL, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Equals.
+	 */
+	public OutType isEqual(InType other) {
+		return toApiSpecificExpression(unresolvedCall(EQUALS, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Not equal.
+	 */
+	public OutType isNotEqual(InType other) {
+		return toApiSpecificExpression(unresolvedCall(NOT_EQUALS, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Returns left plus right.
+	 */
+	public OutType plus(InType other) {
+		return toApiSpecificExpression(unresolvedCall(PLUS, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Returns left minus right.
+	 */
+	public OutType minus(InType other) {
+		return toApiSpecificExpression(unresolvedCall(MINUS, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Returns left divided by right.
+	 */
+	public OutType dividedBy(InType other) {
+		return toApiSpecificExpression(unresolvedCall(DIVIDE, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Returns left multiplied by right.
+	 */
+	public OutType times(InType other) {
+		return toApiSpecificExpression(unresolvedCall(TIMES, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Returns true if the given expression is between lowerBound and upperBound (both inclusive).
+	 * False otherwise. The parameters must be numeric types or identical comparable types.
+	 *
+	 * @param lowerBound numeric or comparable expression
+	 * @param upperBound numeric or comparable expression
+	 */
+	public OutType between(InType lowerBound, InType upperBound) {
+		return toApiSpecificExpression(unresolvedCall(
+			BETWEEN,
+			toExpr(),
+			objectToExpression(lowerBound),
+			objectToExpression(upperBound)));
+	}
+
+	/**
+	 * Returns true if the given expression is not between lowerBound and upperBound (both
+	 * inclusive). False otherwise. The parameters must be numeric types or identical
+	 * comparable types.
+	 *
+	 * @param lowerBound numeric or comparable expression
+	 * @param upperBound numeric or comparable expression
+	 */
+	public OutType notBetween(InType lowerBound, InType upperBound) {
+		return toApiSpecificExpression(unresolvedCall(
+			NOT_BETWEEN,
+			toExpr(),
+			objectToExpression(lowerBound),
+			objectToExpression(upperBound)));
+	}
+
+	/**
+	 * Ternary conditional operator that decides which of two other expressions should be evaluated
+	 * based on a evaluated boolean condition.
+	 *
+	 * <p>e.g. lit(42).isGreater(5).then("A", "B") leads to "A"
+	 *
+	 * @param ifTrue expression to be evaluated if condition holds
+	 * @param ifFalse expression to be evaluated if condition does not hold
+	 */
+	public OutType then(InType ifTrue, InType ifFalse) {
+		return toApiSpecificExpression(unresolvedCall(
+			IF,
+			toExpr(),
+			objectToExpression(ifTrue),
+			objectToExpression(ifFalse)));
+	}
+
+	/**
+	 * Returns true if the given expression is null.
+	 */
+	public OutType isNull() {
+		return toApiSpecificExpression(unresolvedCall(IS_NULL, toExpr()));
+	}
+
+	/**
+	 * Returns true if the given expression is not null.
+	 */
+	public OutType isNotNull() {
+		return toApiSpecificExpression(unresolvedCall(IS_NOT_NULL, toExpr()));
+	}
+
+	/**
+	 * Returns true if given boolean expression is true. False otherwise (for null and false).
+	 */
+	public OutType isTrue() {
+		return toApiSpecificExpression(unresolvedCall(IS_TRUE, toExpr()));
+	}
+
+	/**
+	 * Returns true if given boolean expression is false. False otherwise (for null and true).
+	 */
+	public OutType isFalse() {
+		return toApiSpecificExpression(unresolvedCall(IS_FALSE, toExpr()));
+	}
+
+	/**
+	 * Returns true if given boolean expression is not true (for null and false). False otherwise.
+	 */
+	public OutType isNotTrue() {
+		return toApiSpecificExpression(unresolvedCall(IS_NOT_TRUE, toExpr()));
+	}
+
+	/**
+	 * Returns true if given boolean expression is not false (for null and true). False otherwise.
+	 */
+	public OutType isNotFalse() {
+		return toApiSpecificExpression(unresolvedCall(IS_NOT_FALSE, toExpr()));
+	}
+
+	/**
+	 * Similar to a SQL distinct aggregation clause such as COUNT(DISTINCT a), declares that an
+	 * aggregation function is only applied on distinct input values.
+	 *
+	 * <p>For example:
+	 * <pre>
+	 * {@code
+	 * orders
+	 *  .groupBy($("a"))
+	 *  .select($("a"), $("b").sum().distinct().as("d"))
+	 * }
+	 * </pre>
+	 */
+	public OutType distinct() {
+		return toApiSpecificExpression(unresolvedCall(DISTINCT, toExpr()));
+	}
+
+	/**
+	 * Returns the sum of the numeric field across all input values.
+	 * If all values are null, null is returned.
+	 */
+	public OutType sum() {
+		return toApiSpecificExpression(unresolvedCall(SUM, toExpr()));
+	}
+
+	/**
+	 * Returns the sum of the numeric field across all input values.
+	 * If all values are null, 0 is returned.
+	 */
+	public OutType sum0() {
+		return toApiSpecificExpression(unresolvedCall(SUM0, toExpr()));
+	}
+
+	/**
+	 * Returns the minimum value of field across all input values.
+	 */
+	public OutType min() {
+		return toApiSpecificExpression(unresolvedCall(MIN, toExpr()));
+	}
+
+	/**
+	 * Returns the maximum value of field across all input values.
+	 */
+	public OutType max() {
+		return toApiSpecificExpression(unresolvedCall(MAX, toExpr()));
+	}
+
+	/**
+	 * Returns the number of input rows for which the field is not null.
+	 */
+	public OutType count() {
+		return toApiSpecificExpression(unresolvedCall(COUNT, toExpr()));
+	}
+
+	/**
+	 * Returns the average (arithmetic mean) of the numeric field across all input values.
+	 */
+	public OutType avg() {
+		return toApiSpecificExpression(unresolvedCall(AVG, toExpr()));
+	}
+
+	/**
+	 * Returns the population standard deviation of an expression (the square root of varPop()).
+	 */
+	public OutType stddevPop() {
+		return toApiSpecificExpression(unresolvedCall(STDDEV_POP, toExpr()));
+	}
+
+	/**
+	 * Returns the sample standard deviation of an expression (the square root of varSamp()).
+	 */
+	public OutType stddevSamp() {
+		return toApiSpecificExpression(unresolvedCall(STDDEV_SAMP, toExpr()));
+	}
+
+	/**
+	 * Returns the population standard variance of an expression.
+	 */
+	public OutType varPop() {
+		return toApiSpecificExpression(unresolvedCall(VAR_POP, toExpr()));
+	}
+
+	/**
+	 * Returns the sample variance of a given expression.
+	 */
+	public OutType varSamp() {
+		return toApiSpecificExpression(unresolvedCall(VAR_SAMP, toExpr()));
+	}
+
+	/**
+	 * Returns multiset aggregate of a given expression.
+	 */
+	public OutType collect() {
+		return toApiSpecificExpression(unresolvedCall(COLLECT, toExpr()));
+	}
+
+	/**
+	 * Converts a value to a given data type.
+	 *
+	 * <p>e.g. "42".cast(DataTypes.INT()) leads to 42.
+	 */
+	public OutType cast(DataType toType) {
+		return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), typeLiteral(toType)));
+	}
+
+	/**
+	 * @deprecated This method will be removed in future versions as it uses the old type system. It
+	 *             is recommended to use {@link #cast(DataType)} instead which uses the new type system
+	 *             based on {@link org.apache.flink.table.api.DataTypes}. Please make sure to use either the old
+	 *             or the new type system consistently to avoid unintended behavior. See the website documentation
+	 *             for more information.
+	 */
+	@Deprecated
+	public OutType cast(TypeInformation<?> toType) {
+		return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), typeLiteral(fromLegacyInfoToDataType(toType))));
+	}
+
+	/**
+	 * Specifies ascending order of an expression i.e. a field for orderBy unresolvedCall.
+	 */
+	public OutType asc() {
+		return toApiSpecificExpression(unresolvedCall(ORDER_ASC, toExpr()));
+	}
+
+	/**
+	 * Specifies descending order of an expression i.e. a field for orderBy unresolvedCall.
+	 */
+	public OutType desc() {
+		return toApiSpecificExpression(unresolvedCall(ORDER_DESC, toExpr()));
+	}
+
+	/**
+	 * Returns true if an expression exists in a given list of expressions. This is a shorthand
+	 * for multiple OR conditions.
+	 *
+	 * <p>If the testing set contains null, the result will be null if the element can not be found
+	 * and true if it can be found. If the element is null, the result is always null.
+	 *
+	 * <p>e.g. lit("42").in(1, 2, 3) leads to false.
+	 */
+	@SafeVarargs
+	public final OutType in(InType... elements) {
+		Expression[] args = Stream.concat(
+			Stream.of(toExpr()),
+			Arrays.stream(elements).map(ApiExpressionUtils::objectToExpression))
+			.toArray(Expression[]::new);
+		return toApiSpecificExpression(unresolvedCall(IN, args));
+	}
+
+	/**
+	 * Returns true if an expression exists in a given table sub-query. The sub-query table
+	 * must consist of one column. This column must have the same data type as the expression.
+	 *
+	 * <p>Note: This operation is not supported in a streaming environment yet.
+	 */
+	public OutType in(Table table) {
+		return toApiSpecificExpression(unresolvedCall(IN, toExpr(), tableRef(table.toString(), table)));
+	}
+
+	/**
+	 * Returns the start time (inclusive) of a window when applied on a window reference.
+	 */
+	public OutType start() {
+		return toApiSpecificExpression(unresolvedCall(WINDOW_START, toExpr()));
+	}
+
+	/**
+	 * Returns the end time (exclusive) of a window when applied on a window reference.
+	 *
+	 * <p>e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000.
+	 */
+	public OutType end() {
+		return toApiSpecificExpression(unresolvedCall(WINDOW_END, toExpr()));
+	}
+
+	/**
+	 * Calculates the remainder of division the given number by another one.
+	 */
+	public OutType mod(InType other) {
+		return toApiSpecificExpression(unresolvedCall(MOD, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Calculates the Euler's number raised to the given power.
+	 */
+	public OutType exp() {
+		return toApiSpecificExpression(unresolvedCall(EXP, toExpr()));
+	}
+
+	/**
+	 * Calculates the base 10 logarithm of the given value.
+	 */
+	public OutType log10() {
+		return toApiSpecificExpression(unresolvedCall(LOG10, toExpr()));
+	}
+
+	/**
+	 * Calculates the base 2 logarithm of the given value.
+	 */
+	public OutType log2() {
+		return toApiSpecificExpression(unresolvedCall(LOG2, toExpr()));
+	}
+
+	/**
+	 * Calculates the natural logarithm of the given value.
+	 */
+	public OutType ln() {
+		return toApiSpecificExpression(unresolvedCall(LN, toExpr()));
+	}
+
+	/**
+	 * Calculates the natural logarithm of the given value.
+	 */
+	public OutType log() {
+		return toApiSpecificExpression(unresolvedCall(LOG, toExpr()));
+	}
+
+	/**
+	 * Calculates the logarithm of the given value to the given base.
+	 */
+	public OutType log(InType base) {
+		return toApiSpecificExpression(unresolvedCall(LOG, objectToExpression(base), toExpr()));
+	}
+
+	/**
+	 * Calculates the given number raised to the power of the other value.
+	 */
+	public OutType power(InType other) {
+		return toApiSpecificExpression(unresolvedCall(POWER, toExpr(), objectToExpression(other)));
+	}
+
+	/**
+	 * Calculates the hyperbolic cosine of a given value.
+	 */
+	public OutType cosh() {
+		return toApiSpecificExpression(unresolvedCall(COSH, toExpr()));
+	}
+
+	/**
+	 * Calculates the square root of a given value.
+	 */
+	public OutType sqrt() {
+		return toApiSpecificExpression(unresolvedCall(SQRT, toExpr()));
+	}
+
+	/**
+	 * Calculates the absolute value of given value.
+	 */
+	public OutType abs() {
+		return toApiSpecificExpression(unresolvedCall(ABS, toExpr()));
+	}
+
+	/**
+	 * Calculates the largest integer less than or equal to a given number.
+	 */
+	public OutType floor() {
+		return toApiSpecificExpression(unresolvedCall(FLOOR, toExpr()));
+	}
+
+	/**
+	 * Calculates the hyperbolic sine of a given value.
+	 */
+	public OutType sinh() {
+		return toApiSpecificExpression(unresolvedCall(SINH, toExpr()));
+	}
+
+	/**
+	 * Calculates the smallest integer greater than or equal to a given number.
+	 */
+	public OutType ceil() {
+		return toApiSpecificExpression(unresolvedCall(CEIL, toExpr()));
+	}
+
+	/**
+	 * Calculates the sine of a given number.
+	 */
+	public OutType sin() {
+		return toApiSpecificExpression(unresolvedCall(SIN, toExpr()));
+	}
+
+	/**
+	 * Calculates the cosine of a given number.
+	 */
+	public OutType cos() {
+		return toApiSpecificExpression(unresolvedCall(COS, toExpr()));
+	}
+
+	/**
+	 * Calculates the tangent of a given number.
+	 */
+	public OutType tan() {
+		return toApiSpecificExpression(unresolvedCall(TAN, toExpr()));
+	}
+
+	/**
+	 * Calculates the cotangent of a given number.
+	 */
+	public OutType cot() {
+		return toApiSpecificExpression(unresolvedCall(COT, toExpr()));
+	}
+
+	/**
+	 * Calculates the arc sine of a given number.
+	 */
+	public OutType asin() {
+		return toApiSpecificExpression(unresolvedCall(ASIN, toExpr()));
+	}
+
+	/**
+	 * Calculates the arc cosine of a given number.
+	 */
+	public OutType acos() {
+		return toApiSpecificExpression(unresolvedCall(ACOS, toExpr()));
+	}
+
+	/**
+	 * Calculates the arc tangent of a given number.
+	 */
+	public OutType atan() {
+		return toApiSpecificExpression(unresolvedCall(ATAN, toExpr()));
+	}
+
+	/**
+	 * Calculates the hyperbolic tangent of a given number.
+	 */
+	public OutType tanh() {
+		return toApiSpecificExpression(unresolvedCall(TANH, toExpr()));
+	}
+
+	/**
+	 * Converts numeric from radians to degrees.
+	 */
+	public OutType degrees() {
+		return toApiSpecificExpression(unresolvedCall(DEGREES, toExpr()));
+	}
+
+	/**
+	 * Converts numeric from degrees to radians.
+	 */
+	public OutType radians() {
+		return toApiSpecificExpression(unresolvedCall(RADIANS, toExpr()));
+	}
+
+	/**
+	 * Calculates the signum of a given number.
+	 */
+	public OutType sign() {
+		return toApiSpecificExpression(unresolvedCall(SIGN, toExpr()));
+	}
+
+	/**
+	 * Rounds the given number to integer places right to the decimal point.
+	 */
+	public OutType round(InType places) {
+		return toApiSpecificExpression(unresolvedCall(ROUND, toExpr(), objectToExpression(places)));
+	}
+
+	/**
+	 * Returns a string representation of an integer numeric value in binary format. Returns null if
+	 * numeric is null. E.g. "4" leads to "100", "12" leads to "1100".
+	 */
+	public OutType bin() {
+		return toApiSpecificExpression(unresolvedCall(BIN, toExpr()));
+	}
+
+	/**
+	 * Returns a string representation of an integer numeric value or a string in hex format. Returns
+	 * null if numeric or string is null.
+	 *
+	 * <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a string "hello,world" leads
+	 * to "68656c6c6f2c776f726c64".
+	 */
+	public OutType hex() {
+		return toApiSpecificExpression(unresolvedCall(HEX, toExpr()));
+	}
+
+	/**
+	 * Returns a number of truncated to n decimal places.
+	 * If n is 0,the result has no decimal point or fractional part.
+	 * n can be negative to cause n digits left of the decimal point of the value to become zero.
+	 * E.g. truncate(42.345, 2) to 42.34.
+	 */
+	public OutType truncate(InType n) {
+		return toApiSpecificExpression(unresolvedCall(TRUNCATE, toExpr(), objectToExpression(n)));
+	}
+
+	/**
+	 * Returns a number of truncated to 0 decimal places.
+	 * E.g. truncate(42.345) to 42.0.
+	 */
+	public OutType truncate() {
+		return toApiSpecificExpression(unresolvedCall(TRUNCATE, toExpr()));
+	}
+
+	// String operations
+
+	/**
+	 * Creates a substring of the given string at given index for a given length.
+	 *
+	 * @param beginIndex first character of the substring (starting at 1, inclusive)
+	 * @param length number of characters of the substring
+	 */
+	public OutType substring(InType beginIndex, InType length) {
+		return toApiSpecificExpression(unresolvedCall(SUBSTRING, toExpr(), objectToExpression(beginIndex), objectToExpression(length)));
+	}
+
+	/**
+	 * Creates a substring of the given string beginning at the given index to the end.
+	 *
+	 * @param beginIndex first character of the substring (starting at 1, inclusive)
+	 */
+	public OutType substring(InType beginIndex) {
+		return toApiSpecificExpression(unresolvedCall(SUBSTRING, toExpr(), objectToExpression(beginIndex)));
+	}
+
+	/**
+	 * Removes leading space characters from the given string.
+	 */
+	public OutType trimLeading() {
+		return toApiSpecificExpression(unresolvedCall(
+			TRIM,
+			valueLiteral(true),
+			valueLiteral(false),
+			valueLiteral(" "),
+			toExpr()));
+	}
+
+	/**
+	 * Removes leading characters from the given string.
+	 *
+	 * @param character string containing the character
+	 */
+	public OutType trimLeading(InType character) {
+		return toApiSpecificExpression(unresolvedCall(
+			TRIM,
+			valueLiteral(true),
+			valueLiteral(false),
+			objectToExpression(character),
+			toExpr()));
+	}
+
+	/**
+	 * Removes trailing space characters from the given string.
+	 */
+	public OutType trimTrailing() {
+		return toApiSpecificExpression(unresolvedCall(
+			TRIM,
+			valueLiteral(false),
+			valueLiteral(true),
+			valueLiteral(" "),
+			toExpr()));
+	}
+
+	/**
+	 * Removes trailing characters from the given string.
+	 *
+	 * @param character string containing the character
+	 */
+	public OutType trimTrailing(InType character) {
+		return toApiSpecificExpression(unresolvedCall(
+			TRIM,
+			valueLiteral(false),
+			valueLiteral(true),
+			objectToExpression(character),
+			toExpr()));
+	}
+
+	/**
+	 * Removes leading and trailing space characters from the given string.
+	 */
+	public OutType trim() {
+		return toApiSpecificExpression(unresolvedCall(
+			TRIM,
+			valueLiteral(true),
+			valueLiteral(true),
+			valueLiteral(" "),
+			toExpr()));
+	}
+
+	/**
+	 * Removes leading and trailing characters from the given string.
+	 *
+	 * @param character string containing the character
+	 */
+	public OutType trim(InType character) {
+		return toApiSpecificExpression(unresolvedCall(
+			TRIM,
+			valueLiteral(true),
+			valueLiteral(true),
+			objectToExpression(character),
+			toExpr()));
+	}
+
+	/**
+	 * Returns a new string which replaces all the occurrences of the search target
+	 * with the replacement string (non-overlapping).
+	 */
+	public OutType replace(InType search, InType replacement) {
+		return toApiSpecificExpression(unresolvedCall(REPLACE, toExpr(), objectToExpression(search), objectToExpression(replacement)));
+	}
+
+	/**
+	 * Returns the length of a string.
+	 */
+	public OutType charLength() {
+		return toApiSpecificExpression(unresolvedCall(CHAR_LENGTH, toExpr()));
+	}
+
+	/**
+	 * Returns all of the characters in a string in upper case using the rules of
+	 * the default locale.
+	 */
+	public OutType upperCase() {
+		return toApiSpecificExpression(unresolvedCall(UPPER, toExpr()));
+	}
+
+	/**
+	 * Returns all of the characters in a string in lower case using the rules of
+	 * the default locale.
+	 */
+	public OutType lowerCase() {
+		return toApiSpecificExpression(unresolvedCall(LOWER, toExpr()));
+	}
+
+	/**
+	 * Converts the initial letter of each word in a string to uppercase.
+	 * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.
+	 */
+	public OutType initCap() {
+		return toApiSpecificExpression(unresolvedCall(INIT_CAP, toExpr()));
+	}
+
+	/**
+	 * Returns true, if a string matches the specified LIKE pattern.
+	 *
+	 * <p>e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n"
+	 */
+	public OutType like(InType pattern) {
+		return toApiSpecificExpression(unresolvedCall(LIKE, toExpr(), objectToExpression(pattern)));
+	}
+
+	/**
+	 * Returns true, if a string matches the specified SQL regex pattern.
+	 *
+	 * <p>e.g. "A+" matches all strings that consist of at least one A
+	 */
+	public OutType similar(InType pattern) {
+		return toApiSpecificExpression(unresolvedCall(SIMILAR, toExpr(), objectToExpression(pattern)));
+	}
+
+	/**
+	 * Returns the position of string in an other string starting at 1.
+	 * Returns 0 if string could not be found.
+	 *
+	 * <p>e.g. lit("a").position("bbbbba") leads to 6
+	 */
+	public OutType position(InType haystack) {
+		return toApiSpecificExpression(unresolvedCall(POSITION, toExpr(), objectToExpression(haystack)));
+	}
+
+	/**
+	 * Returns a string left-padded with the given pad string to a length of len characters. If
+	 * the string is longer than len, the return value is shortened to len characters.
+	 *
+	 * <p>e.g. lit("hi").lpad(4, "??") returns "??hi",  lit("hi").lpad(1, '??') returns "h"
+	 */
+	public OutType lpad(InType len, InType pad) {
+		return toApiSpecificExpression(unresolvedCall(LPAD, toExpr(), objectToExpression(len), objectToExpression(pad)));
+	}
+
+	/**
+	 * Returns a string right-padded with the given pad string to a length of len characters. If
+	 * the string is longer than len, the return value is shortened to len characters.
+	 *
+	 * <p>e.g. lit("hi").rpad(4, "??") returns "hi??",  lit("hi").rpad(1, '??') returns "h"
+	 */
+	public OutType rpad(InType len, InType pad) {
+		return toApiSpecificExpression(unresolvedCall(RPAD, toExpr(), objectToExpression(len), objectToExpression(pad)));
+	}
+
+	/**
+	 * Defines an aggregation to be used for a previously specified over window.
+	 *
+	 * <p>For example:
+	 *
+	 * <pre>
+	 * {@code
+	 * table
+	 *   .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
+	 *   .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
+	 * }</pre>
+	 */
+	public OutType over(InType alias) {
+		return toApiSpecificExpression(unresolvedCall(OVER, toExpr(), objectToExpression(alias)));
+	}
+
+	/**
+	 * Replaces a substring of string with a string starting at a position (starting at 1).
+	 *
+	 * <p>e.g. lit("xxxxxtest").overlay("xxxx", 6) leads to "xxxxxxxxx"
+	 */
+	public OutType overlay(InType newString, InType starting) {
+		return toApiSpecificExpression(unresolvedCall(
+			OVERLAY,
+			toExpr(),
+			objectToExpression(newString),
+			objectToExpression(starting)));
+	}
+
+	/**
+	 * Replaces a substring of string with a string starting at a position (starting at 1).
+	 * The length specifies how many characters should be removed.
+	 *
+	 * <p>e.g. lit("xxxxxtest").overlay("xxxx", 6, 2) leads to "xxxxxxxxxst"
+	 */
+	public OutType overlay(InType newString, InType starting, InType length) {
+		return toApiSpecificExpression(unresolvedCall(
+			OVERLAY,
+			toExpr(),
+			objectToExpression(newString),
+			objectToExpression(starting),
+			objectToExpression(length)));
+	}
+
+	/**
+	 * Returns a string with all substrings that match the regular expression consecutively
+	 * being replaced.
+	 */
+	public OutType regexpReplace(InType regex, InType replacement) {
+		return toApiSpecificExpression(unresolvedCall(
+			REGEXP_REPLACE,
+			toExpr(),
+			objectToExpression(regex),
+			objectToExpression(replacement)));
+	}
+
+	/**
+	 * Returns a string extracted with a specified regular expression and a regex match group
+	 * index.
+	 */
+	public OutType regexpExtract(InType regex, InType extractIndex) {
+		return toApiSpecificExpression(unresolvedCall(
+			REGEXP_EXTRACT,
+			toExpr(),
+			objectToExpression(regex),
+			objectToExpression(extractIndex)));
+	}
+
+	/**
+	 * Returns a string extracted with a specified regular expression.
+	 */
+	public OutType regexpExtract(InType regex) {
+		return toApiSpecificExpression(unresolvedCall(REGEXP_EXTRACT, toExpr(), objectToExpression(regex)));
+	}
+
+	/**
+	 * Returns the base string decoded with base64.
+	 */
+	public OutType fromBase64() {
+		return toApiSpecificExpression(unresolvedCall(FROM_BASE64, toExpr()));
+	}
+
+	/**
+	 * Returns the base64-encoded result of the input string.
+	 */
+	public OutType toBase64() {
+		return toApiSpecificExpression(unresolvedCall(TO_BASE64, toExpr()));
+	}
+
+	/**
+	 * Returns a string that removes the left whitespaces from the given string.
+	 */
+	public OutType ltrim() {
+		return toApiSpecificExpression(unresolvedCall(LTRIM, toExpr()));
+	}
+
+	/**
+	 * Returns a string that removes the right whitespaces from the given string.
+	 */
+	public OutType rtrim() {
+		return toApiSpecificExpression(unresolvedCall(RTRIM, toExpr()));
+	}
+
+	/**
+	 * Returns a string that repeats the base string n times.
+	 */
+	public OutType repeat(InType n) {
+		return toApiSpecificExpression(unresolvedCall(REPEAT, toExpr(), objectToExpression(n)));
+	}
+
+	// Temporal operations
+
+	/**
+	 * Parses a date string in the form "yyyy-MM-dd" to a SQL Date.
+	 */
+	public OutType toDate() {
+		return toApiSpecificExpression(unresolvedCall(
+			CAST,
+			toExpr(),
+			typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE))));
+	}
+
+	/**
+	 * Parses a time string in the form "HH:mm:ss" to a SQL Time.
+	 */
+	public OutType toTime() {
+		return toApiSpecificExpression(unresolvedCall(
+			CAST,
+			toExpr(),
+			typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME))));
+	}
+
+	/**
+	 * Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a SQL Timestamp.
+	 */
+	public OutType toTimestamp() {
+		return toApiSpecificExpression(unresolvedCall(
+			CAST,
+			toExpr(),
+			typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP))));
+	}
+
+	/**
+	 * Extracts parts of a time point or time interval. Returns the part as a long value.
+	 *
+	 * <p>e.g. lit("2006-06-05").toDate().extract(DAY) leads to 5
+	 */
+	public OutType extract(TimeIntervalUnit timeIntervalUnit) {
+		return toApiSpecificExpression(unresolvedCall(EXTRACT, valueLiteral(timeIntervalUnit), toExpr()));
+	}
+
+	/**
+	 * Rounds down a time point to the given unit.
+	 *
+	 * <p>e.g. lit("12:44:31").toDate().floor(MINUTE) leads to 12:44:00
+	 */
+	public OutType floor(TimeIntervalUnit timeIntervalUnit) {
+		return toApiSpecificExpression(unresolvedCall(FLOOR, valueLiteral(timeIntervalUnit), toExpr()));
+	}
+
+	/**
+	 * Rounds up a time point to the given unit.
+	 *
+	 * <p>e.g. lit("12:44:31").toDate().ceil(MINUTE) leads to 12:45:00
+	 */
+	public OutType ceil(TimeIntervalUnit timeIntervalUnit) {
+		return toApiSpecificExpression(unresolvedCall(
+			CEIL,
+			valueLiteral(timeIntervalUnit),
+			toExpr()));
+	}
+
+	// Advanced type helper functions
+
+	/**
+	 * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
+	 * returns it's value.
+	 *
+	 * @param name name of the field (similar to Flink's field expressions)
+	 */
+	public OutType get(String name) {
+		return toApiSpecificExpression(unresolvedCall(GET, toExpr(), valueLiteral(name)));
+	}
+
+	/**
+	 * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
+	 * returns it's value.
+	 *
+	 * @param index position of the field
+	 */
+	public OutType get(int index) {
+		return toApiSpecificExpression(unresolvedCall(GET, toExpr(), valueLiteral(index)));
+	}
+
+	/**
+	 * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
+	 * into a flat representation where every subtype is a separate field.
+	 */
+	public OutType flatten() {
+		return toApiSpecificExpression(unresolvedCall(FLATTEN, toExpr()));
+	}
+
+	/**
+	 * Accesses the element of an array or map based on a key or an index (starting at 1).
+	 *
+	 * @param index key or position of the element (array index starting at 1)
+	 */
+	public OutType at(InType index) {
+		return toApiSpecificExpression(unresolvedCall(AT, toExpr(), objectToExpression(index)));
+	}
+
+	/**
+	 * Returns the number of elements of an array or number of entries of a map.
+	 */
+	public OutType cardinality() {
+		return toApiSpecificExpression(unresolvedCall(CARDINALITY, toExpr()));
+	}
+
+	/**
+	 * Returns the sole element of an array with a single element. Returns null if the array is
+	 * empty. Throws an exception if the array has more than one element.
+	 */
+	public OutType element() {
+		return toApiSpecificExpression(unresolvedCall(ARRAY_ELEMENT, toExpr()));
+	}
+
+	// Time definition
+
+	/**
+	 * Declares a field as the rowtime attribute for indicating, accessing, and working in
+	 * Flink's event time.
+	 */
+	public OutType rowtime() {
+		return toApiSpecificExpression(unresolvedCall(ROWTIME, toExpr()));
+	}
+
+	/**
+	 * Declares a field as the proctime attribute for indicating, accessing, and working in
+	 * Flink's processing time.
+	 */
+	public OutType proctime() {
+		return toApiSpecificExpression(unresolvedCall(PROCTIME, toExpr()));
+	}
+
+	/**
+	 * Creates an interval of the given number of years.
+	 *
+	 * <p>The produced expression is of type {@code DataTypes.INTERVAL}
+	 */
+	public OutType year() {
+		return toApiSpecificExpression(toMonthInterval(toExpr(), 12));
+	}
+
+	/**
+	 * Creates an interval of the given number of years.
+	 */
+	public OutType years() {
+		return year();
+	}
+
+	/**
+	 * Creates an interval of the given number of quarters.
+	 */
+	public OutType quarter() {
+		return toApiSpecificExpression(toMonthInterval(toExpr(), 3));
+	}
+
+	/**
+	 * Creates an interval of the given number of quarters.
+	 */
+	public OutType quarters() {
+		return quarter();
+	}
+
+	/**
+	 * Creates an interval of the given number of months.
+	 */
+	public OutType month() {
+		return toApiSpecificExpression(toMonthInterval(toExpr(), 1));
+	}
+
+	/**
+	 * Creates an interval of the given number of months.
+	 */
+	public OutType months() {
+		return month();
+	}
+
+	/**
+	 * Creates an interval of the given number of weeks.
+	 */
+	public OutType week() {
+		return toApiSpecificExpression(toMilliInterval(toExpr(), 7 * MILLIS_PER_DAY));
+	}
+
+	/**
+	 * Creates an interval of the given number of weeks.
+	 */
+	public OutType weeks() {
+		return week();
+	}
+
+	/**
+	 * Creates an interval of the given number of days.
+	 */
+	public OutType day() {
+		return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_DAY));
+	}
+
+	/**
+	 * Creates an interval of the given number of days.
+	 */
+	public OutType days() {
+		return day();
+	}
+
+	/**
+	 * Creates an interval of the given number of hours.
+	 */
+	public OutType hour() {
+		return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_HOUR));
+	}
+
+	/**
+	 * Creates an interval of the given number of hours.
+	 */
+	public OutType hours() {
+		return hour();
+	}
+
+	/**
+	 * Creates an interval of the given number of minutes.
+	 */
+	public OutType minute() {
+		return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_MINUTE));
+	}
+
+	/**
+	 * Creates an interval of the given number of minutes.
+	 */
+	public OutType minutes() {
+		return minute();
+	}
+
+	/**
+	 * Creates an interval of the given number of seconds.
+	 */
+	public OutType second() {
+		return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_SECOND));
+	}
+
+	/**
+	 * Creates an interval of the given number of seconds.
+	 */
+	public OutType seconds() {
+		return second();
+	}
+
+	/**
+	 * Creates an interval of the given number of milliseconds.
+	 */
+	public OutType milli() {
+		return toApiSpecificExpression(toMilliInterval(toExpr(), 1));
+	}
+
+	/**
+	 * Creates an interval of the given number of milliseconds.
+	 */
+	public OutType millis() {
+		return milli();
+	}
+
+	// Hash functions
+
+	/**
+	 * Returns the MD5 hash of the string argument; null if string is null.
+	 *
+	 * @return string of 32 hexadecimal digits or null
+	 */
+	public OutType md5() {
+		return toApiSpecificExpression(unresolvedCall(MD5, toExpr()));
+	}
+
+	/**
+	 * Returns the SHA-1 hash of the string argument; null if string is null.
+	 *
+	 * @return string of 40 hexadecimal digits or null
+	 */
+	public OutType sha1() {
+		return toApiSpecificExpression(unresolvedCall(SHA1, toExpr()));
+	}
+
+	/**
+	 * Returns the SHA-224 hash of the string argument; null if string is null.
+	 *
+	 * @return string of 56 hexadecimal digits or null
+	 */
+	public OutType sha224() {
+		return toApiSpecificExpression(unresolvedCall(SHA224, toExpr()));
+	}
+
+	/**
+	 * Returns the SHA-256 hash of the string argument; null if string is null.
+	 *
+	 * @return string of 64 hexadecimal digits or null
+	 */
+	public OutType sha256() {
+		return toApiSpecificExpression(unresolvedCall(SHA256, toExpr()));
+	}
+
+	/**
+	 * Returns the SHA-384 hash of the string argument; null if string is null.
+	 *
+	 * @return string of 96 hexadecimal digits or null
+	 */
+	public OutType sha384() {
+		return toApiSpecificExpression(unresolvedCall(SHA384, toExpr()));
+	}
+
+	/**
+	 * Returns the SHA-512 hash of the string argument; null if string is null.
+	 *
+	 * @return string of 128 hexadecimal digits or null
+	 */
+	public OutType sha512() {
+		return toApiSpecificExpression(unresolvedCall(SHA512, toExpr()));
+	}
+
+	/**
+	 * Returns the hash for the given string expression using the SHA-2 family of hash
+	 * functions (SHA-224, SHA-256, SHA-384, or SHA-512).
+	 *
+	 * @param hashLength bit length of the result (either 224, 256, 384, or 512)
+	 * @return string or null if one of the arguments is null.
+	 */
+	public OutType sha2(InType hashLength) {
+		return toApiSpecificExpression(unresolvedCall(SHA2, toExpr(), objectToExpression(hashLength)));
+	}
+}
+
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 2b545ab..2333ffa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -119,9 +119,7 @@ public class TableImpl implements Table {
 
 	@Override
 	public Table select(Expression... fields) {
-		List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields)
-			.map(f -> f.accept(lookupResolver))
-			.collect(Collectors.toList());
+		List<Expression> expressionsWithResolvedCalls = preprocessExpressions(fields);
 		CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
 			expressionsWithResolvedCalls
 		);
@@ -464,9 +462,7 @@ public class TableImpl implements Table {
 	}
 
 	private Table addColumnsOperation(boolean replaceIfExist, List<Expression> fields) {
-		List<Expression> expressionsWithResolvedCalls = fields.stream()
-			.map(f -> f.accept(lookupResolver))
-			.collect(Collectors.toList());
+		List<Expression> expressionsWithResolvedCalls = preprocessExpressions(fields);
 		CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
 			expressionsWithResolvedCalls
 		);
@@ -563,6 +559,16 @@ public class TableImpl implements Table {
 		return new TableImpl(tableEnvironment, operation, operationTreeBuilder, lookupResolver);
 	}
 
+	private List<Expression> preprocessExpressions(List<Expression> expressions) {
+		return preprocessExpressions(expressions.toArray(new Expression[0]));
+	}
+
+	private List<Expression> preprocessExpressions(Expression[] expressions) {
+		return Arrays.stream(expressions)
+			.map(f -> f.accept(lookupResolver))
+			.collect(Collectors.toList());
+	}
+
 	private static final class GroupedTableImpl implements GroupedTable {
 
 		private final TableImpl table;
@@ -582,9 +588,7 @@ public class TableImpl implements Table {
 
 		@Override
 		public Table select(Expression... fields) {
-			List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields)
-				.map(f -> f.accept(table.lookupResolver))
-				.collect(Collectors.toList());
+			List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields);
 			CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
 				expressionsWithResolvedCalls
 			);
@@ -716,7 +720,8 @@ public class TableImpl implements Table {
 
 		@Override
 		public WindowGroupedTable groupBy(Expression... fields) {
-			List<Expression> fieldsWithoutWindow = Arrays.stream(fields)
+			List<Expression> fieldsWithoutWindow = table.preprocessExpressions(fields)
+				.stream()
 				.filter(f -> !window.getAlias().equals(f))
 				.collect(Collectors.toList());
 			if (fields.length != fieldsWithoutWindow.size() + 1) {
@@ -749,9 +754,7 @@ public class TableImpl implements Table {
 
 		@Override
 		public Table select(Expression... fields) {
-			List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields)
-				.map(f -> f.accept(table.lookupResolver))
-				.collect(Collectors.toList());
+			List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields);
 			CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
 				expressionsWithResolvedCalls
 			);
@@ -816,9 +819,7 @@ public class TableImpl implements Table {
 
 		@Override
 		public Table select(Expression... fields) {
-			List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields)
-				.map(f -> f.accept(table.lookupResolver))
-				.collect(Collectors.toList());
+			List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields);
 			CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
 				expressionsWithResolvedCalls
 			);
@@ -873,9 +874,7 @@ public class TableImpl implements Table {
 
 		@Override
 		public Table select(Expression... fields) {
-			List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields)
-				.map(f -> f.accept(table.lookupResolver))
-				.collect(Collectors.toList());
+			List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields);
 			CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
 				expressionsWithResolvedCalls
 			);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
index ec57fd5..92f8d83 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.ValidationException;
@@ -31,6 +32,7 @@ import org.apache.flink.table.types.DataType;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Utilities for API-specific {@link Expression}s.
@@ -50,6 +52,24 @@ public final class ApiExpressionUtils {
 		// private
 	}
 
+	public static Expression objectToExpression(Object expression) {
+		if (expression instanceof ApiExpression) {
+			return ((ApiExpression) expression).toExpr();
+		} else if (expression instanceof Expression) {
+			return (Expression) expression;
+		} else {
+			return valueLiteral(expression);
+		}
+	}
+
+	public static Expression unwrapFromApi(Expression expression) {
+		if (expression instanceof ApiExpression) {
+			return ((ApiExpression) expression).toExpr();
+		} else {
+			return expression;
+		}
+	}
+
 	public static LocalReferenceExpression localRef(String name, DataType dataType) {
 		return new LocalReferenceExpression(name, dataType);
 	}
@@ -74,14 +94,17 @@ public final class ApiExpressionUtils {
 			FunctionIdentifier functionIdentifier,
 			FunctionDefinition functionDefinition,
 			Expression... args) {
-		return new UnresolvedCallExpression(functionIdentifier, functionDefinition, Arrays.asList(args));
+		return unresolvedCall(functionIdentifier, functionDefinition, Arrays.asList(args));
 	}
 
 	public static UnresolvedCallExpression unresolvedCall(
 			FunctionIdentifier functionIdentifier,
 			FunctionDefinition functionDefinition,
 			List<Expression> args) {
-		return new UnresolvedCallExpression(functionIdentifier, functionDefinition, args);
+		return new UnresolvedCallExpression(functionIdentifier, functionDefinition,
+			args.stream()
+				.map(ApiExpressionUtils::unwrapFromApi)
+				.collect(Collectors.toList()));
 	}
 
 	public static UnresolvedCallExpression unresolvedCall(FunctionDefinition functionDefinition, Expression... args) {
@@ -89,7 +112,11 @@ public final class ApiExpressionUtils {
 	}
 
 	public static UnresolvedCallExpression unresolvedCall(FunctionDefinition functionDefinition, List<Expression> args) {
-		return new UnresolvedCallExpression(functionDefinition, args);
+		return new UnresolvedCallExpression(
+			functionDefinition,
+			args.stream()
+				.map(ApiExpressionUtils::unwrapFromApi)
+				.collect(Collectors.toList()));
 	}
 
 	public static TableReferenceExpression tableRef(String name, Table table) {
@@ -101,7 +128,11 @@ public final class ApiExpressionUtils {
 	}
 
 	public static LookupCallExpression lookupCall(String name, Expression... args) {
-		return new LookupCallExpression(name, Arrays.asList(args));
+		return new LookupCallExpression(
+			name,
+			Arrays.stream(args)
+				.map(ApiExpressionUtils::unwrapFromApi)
+				.collect(Collectors.toList()));
 	}
 
 	public static Expression toMonthInterval(Expression e, int multiplier) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java
index d30efd5..ef0663a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -43,7 +42,7 @@ public final class LookupCallExpression implements Expression {
 
 	LookupCallExpression(String unresolvedFunction, List<Expression> args) {
 		this.unresolvedName = Preconditions.checkNotNull(unresolvedFunction);
-		this.args = Collections.unmodifiableList(new ArrayList<>(Preconditions.checkNotNull(args)));
+		this.args = Collections.unmodifiableList(Preconditions.checkNotNull(args));
 	}
 
 	public String getUnresolvedName() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
index 7fdf57d..a280d0e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -61,8 +60,7 @@ public final class UnresolvedCallExpression implements Expression {
 			Preconditions.checkNotNull(functionIdentifier, "Function identifier must not be null.");
 		this.functionDefinition =
 			Preconditions.checkNotNull(functionDefinition, "Function definition must not be null.");
-		this.args = Collections.unmodifiableList(
-			new ArrayList<>(Preconditions.checkNotNull(args, "Arguments must not be null.")));
+		this.args = Collections.unmodifiableList(Preconditions.checkNotNull(args, "Arguments must not be null."));
 	}
 
 	UnresolvedCallExpression(
@@ -71,8 +69,7 @@ public final class UnresolvedCallExpression implements Expression {
 		this.functionIdentifier = null;
 		this.functionDefinition =
 			Preconditions.checkNotNull(functionDefinition, "Function definition must not be null.");
-		this.args = Collections.unmodifiableList(
-			new ArrayList<>(Preconditions.checkNotNull(args, "Arguments must not be null.")));
+		this.args = Collections.unmodifiableList(Preconditions.checkNotNull(args, "Arguments must not be null."));
 	}
 
 	public Optional<FunctionIdentifier> getFunctionIdentifier() {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 434697d..59ef852 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -77,6 +77,7 @@ public class ExpressionResolver {
 	 */
 	public static List<ResolverRule> getExpandingResolverRules() {
 		return Arrays.asList(
+			ResolverRules.UNWRAP_API_EXPRESSION,
 			ResolverRules.LOOKUP_CALL_BY_NAME,
 			ResolverRules.FLATTEN_STAR_REFERENCE,
 			ResolverRules.EXPAND_COLUMN_FUNCTIONS);
@@ -87,6 +88,7 @@ public class ExpressionResolver {
 	 */
 	public static List<ResolverRule> getAllResolverRules() {
 		return Arrays.asList(
+			ResolverRules.UNWRAP_API_EXPRESSION,
 			ResolverRules.LOOKUP_CALL_BY_NAME,
 			ResolverRules.FLATTEN_STAR_REFERENCE,
 			ResolverRules.EXPAND_COLUMN_FUNCTIONS,
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
index 93471ef..3071fa6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.expressions.resolver;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
@@ -65,6 +66,17 @@ public class LookupCallResolver extends ApiExpressionDefaultVisitor<Expression>
 	}
 
 	@Override
+	public Expression visitNonApiExpression(Expression other) {
+		// LookupCallResolver might be called outside of ExpressionResolver, thus we need to additionally
+		// handle the ApiExpressions here
+		if (other instanceof ApiExpression) {
+			return ((ApiExpression) other).toExpr().accept(this);
+		} else {
+			return defaultMethod(other);
+		}
+	}
+
+	@Override
 	protected Expression defaultMethod(Expression expression) {
 		return expression;
 	}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
index 42dc0757..3acb5af 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
@@ -55,7 +55,7 @@ final class OverWindowResolverRule implements ResolverRule {
 			.collect(Collectors.toList());
 	}
 
-	private class ExpressionResolverVisitor extends RuleExpressionVisitor<Expression> {
+	private static class ExpressionResolverVisitor extends RuleExpressionVisitor<Expression> {
 
 		ExpressionResolverVisitor(ResolutionContext context) {
 			super(context);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
index 671b1e1..aab2272 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 
 /**
@@ -62,6 +63,11 @@ public final class ResolverRules {
 	 */
 	public static final ResolverRule QUALIFY_BUILT_IN_FUNCTIONS = new QualifyBuiltInFunctionsRule();
 
+	/**
+	 * Unwraps all {@link ApiExpression}.
+	 */
+	public static final ResolverRule UNWRAP_API_EXPRESSION = new UnwrapApiExpressionRule();
+
 	private ResolverRules() {
 	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java
similarity index 59%
copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java
index 5180d33..1ef374b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java
@@ -16,25 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.api;
+package org.apache.flink.table.expressions.resolver.rules;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 /**
- * Tumbling window on time with alias. Fully specifies a window.
+ * Unwraps all {@link ApiExpression}.
  */
-@PublicEvolving
-public final class TumbleWithSizeOnTimeWithAlias extends GroupWindow {
-
-	private final Expression size;
-
-	TumbleWithSizeOnTimeWithAlias(Expression alias, Expression timeField, Expression size) {
-		super(alias, timeField);
-		this.size = size;
-	}
-
-	public Expression getSize() {
-		return size;
+@Internal
+final class UnwrapApiExpressionRule implements ResolverRule {
+	@Override
+	public List<Expression> apply(
+			List<Expression> expression,
+			ResolutionContext context) {
+		return expression.stream().map(ApiExpressionUtils::unwrapFromApi).collect(Collectors.toList());
 	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
index 4877bc4..3a3b0a0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
@@ -154,7 +155,7 @@ public class FieldInfoUtils {
 	 * used if the input type has a defined field order (tuple, case class, Row) and no of fields
 	 * references a field of the input type.
 	 */
-	public static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) {
+	private static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) {
 		if (!(ct instanceof TupleTypeInfoBase)) {
 			return false;
 		}
@@ -224,7 +225,10 @@ public class FieldInfoUtils {
 	public static <A> TypeInfoSchema getFieldsInfo(TypeInformation<A> inputType, Expression[] expressions) {
 		validateInputTypeInfo(inputType);
 
-		final List<FieldInfo> fieldInfos = extractFieldInformation(inputType, expressions);
+		final List<FieldInfo> fieldInfos = extractFieldInformation(
+			inputType,
+			Arrays.stream(expressions).map(ApiExpressionUtils::unwrapFromApi).toArray(Expression[]::new)
+		);
 
 		validateNoStarReference(fieldInfos);
 		boolean isRowtimeAttribute = checkIfRowtimeAttribute(fieldInfos);
@@ -244,8 +248,8 @@ public class FieldInfoUtils {
 	}
 
 	private static <A> List<FieldInfo> extractFieldInformation(
-		TypeInformation<A> inputType,
-		Expression[] exprs) {
+			TypeInformation<A> inputType,
+			Expression[] exprs) {
 		final List<FieldInfo> fieldInfos;
 		if (inputType instanceof GenericTypeInfo && inputType.getTypeClass() == Row.class) {
 			throw new ValidationException(
@@ -367,7 +371,7 @@ public class FieldInfoUtils {
 		final TypeInformation<?>[] fieldTypes;
 		if (inputType instanceof CompositeType) {
 			int arity = inputType.getArity();
-			CompositeType ct = (CompositeType<?>) inputType;
+			CompositeType<?> ct = (CompositeType<?>) inputType;
 			fieldTypes = IntStream.range(0, arity).mapToObj(ct::getTypeAt).toArray(TypeInformation[]::new);
 		} else {
 			fieldTypes = new TypeInformation[]{inputType};
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
index cdf4042..cb71dc7 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
@@ -17,105 +17,82 @@
  */
 package org.apache.flink.table.api
 
-import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short => JShort}
-import java.math.{BigDecimal => JBigDecimal}
-import java.sql.{Date, Time, Timestamp}
-import java.time.{LocalDate, LocalDateTime, LocalTime}
-
 import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.internal.BaseExpressions
+import org.apache.flink.table.expressions.ApiExpressionUtils._
 import org.apache.flink.table.expressions._
-import ApiExpressionUtils._
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedAggregateFunction, UserDefinedFunctionHelper, _}
 import org.apache.flink.table.types.DataType
-import org.apache.flink.table.types.utils.TypeConversions
-import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short => JShort}
+import java.math.{BigDecimal => JBigDecimal}
+import java.sql.{Date, Time, Timestamp}
+import java.time.{LocalDate, LocalDateTime, LocalTime}
 
 import _root_.scala.language.implicitConversions
 
 /**
   * These are all the operations that can be used to construct an [[Expression]] AST for
   * expression operations.
-  *
-  * These operations must be kept in sync with the parser in
-  * [[org.apache.flink.table.expressions.ExpressionParser]].
   */
 @PublicEvolving
-trait ImplicitExpressionOperations {
+trait ImplicitExpressionOperations extends BaseExpressions[Expression, Expression] {
   private[flink] def expr: Expression
 
+  override def toExpr: Expression = expr
+
+  override protected def toApiSpecificExpression(expression: Expression): Expression = expression
+
   /**
-    * Enables literals on left side of binary expressions.
-    *
-    * e.g. 12.toExpr % 'a
-    *
-    * @return expression
-    */
-  def toExpr: Expression = expr
+   * Specifies a name for an expression i.e. a field.
+   *
+   * @param name name for one field
+   * @param extraNames additional names if the expression expands to multiple fields
+   * @return field with an alias
+   */
+  def as(name: Symbol, extraNames: Symbol*): Expression = as(name.name, extraNames.map(_.name): _*)
 
   /**
     * Boolean AND in three-valued logic.
     */
-  def && (other: Expression): Expression = unresolvedCall(AND, expr, other)
+  def && (other: Expression): Expression = and(other)
 
   /**
     * Boolean OR in three-valued logic.
     */
-  def || (other: Expression): Expression = unresolvedCall(OR, expr, other)
+  def || (other: Expression): Expression = or(other)
 
   /**
     * Greater than.
     */
-  def > (other: Expression): Expression = unresolvedCall(GREATER_THAN, expr, other)
+  def > (other: Expression): Expression = isGreater(other)
 
   /**
     * Greater than or equal.
     */
-  def >= (other: Expression): Expression = unresolvedCall(GREATER_THAN_OR_EQUAL, expr, other)
+  def >= (other: Expression): Expression = isGreaterOrEqual(other)
 
   /**
     * Less than.
     */
-  def < (other: Expression): Expression = unresolvedCall(LESS_THAN, expr, other)
+  def < (other: Expression): Expression = isLess(other)
 
   /**
     * Less than or equal.
     */
-  def <= (other: Expression): Expression = unresolvedCall(LESS_THAN_OR_EQUAL, expr, other)
+  def <= (other: Expression): Expression = isLessOrEqual(other)
 
   /**
     * Equals.
     */
-  def === (other: Expression): Expression = unresolvedCall(EQUALS, expr, other)
+  def === (other: Expression): Expression = isEqual(other)
 
   /**
     * Not equal.
     */
-  def !== (other: Expression): Expression = unresolvedCall(NOT_EQUALS, expr, other)
-
-  /**
-    * Returns true if the given expression is between lowerBound and upperBound (both inclusive).
-    * False otherwise. The parameters must be numeric types or identical comparable types.
-    *
-    * @param lowerBound numeric or comparable expression
-    * @param upperBound numeric or comparable expression
-    * @return boolean or null
-    */
-  def between(lowerBound: Expression, upperBound: Expression): Expression =
-    unresolvedCall(BETWEEN, expr, lowerBound, upperBound)
-
-  /**
-    * Returns true if the given expression is not between lowerBound and upperBound (both
-    * inclusive). False otherwise. The parameters must be numeric types or identical
-    * comparable types.
-    *
-    * @param lowerBound numeric or comparable expression
-    * @param upperBound numeric or comparable expression
-    * @return boolean or null
-    */
-  def notBetween(lowerBound: Expression, upperBound: Expression): Expression =
-    unresolvedCall(NOT_BETWEEN, expr, lowerBound, upperBound)
+  def !== (other: Expression): Expression = isNotEqual(other)
 
   /**
     * Whether boolean expression is not true; returns null if boolean is null.
@@ -125,7 +102,7 @@ trait ImplicitExpressionOperations {
   /**
     * Returns negative numeric.
     */
-  def unary_- : Expression = unresolvedCall(MINUS_PREFIX, expr)
+  def unary_- : Expression = Expressions.negative(expr)
 
   /**
     * Returns numeric.
@@ -133,54 +110,24 @@ trait ImplicitExpressionOperations {
   def unary_+ : Expression = expr
 
   /**
-    * Returns true if the given expression is null.
-    */
-  def isNull: Expression = unresolvedCall(IS_NULL, expr)
-
-  /**
-    * Returns true if the given expression is not null.
-    */
-  def isNotNull: Expression = unresolvedCall(IS_NOT_NULL, expr)
-
-  /**
-    * Returns true if given boolean expression is true. False otherwise (for null and false).
-    */
-  def isTrue: Expression = unresolvedCall(IS_TRUE, expr)
-
-  /**
-    * Returns true if given boolean expression is false. False otherwise (for null and true).
-    */
-  def isFalse: Expression = unresolvedCall(IS_FALSE, expr)
-
-  /**
-    * Returns true if given boolean expression is not true (for null and false). False otherwise.
-    */
-  def isNotTrue: Expression = unresolvedCall(IS_NOT_TRUE, expr)
-
-  /**
-    * Returns true if given boolean expression is not false (for null and true). False otherwise.
-    */
-  def isNotFalse: Expression = unresolvedCall(IS_NOT_FALSE, expr)
-
-  /**
     * Returns left plus right.
     */
-  def + (other: Expression): Expression = unresolvedCall(PLUS, expr, other)
+  def + (other: Expression): Expression = plus(other)
 
   /**
     * Returns left minus right.
     */
-  def - (other: Expression): Expression = unresolvedCall(MINUS, expr, other)
+  def - (other: Expression): Expression = minus(other)
 
   /**
     * Returns left divided by right.
     */
-  def / (other: Expression): Expression = unresolvedCall(DIVIDE, expr, other)
+  def / (other: Expression): Expression = dividedBy(other)
 
   /**
     * Returns left multiplied by right.
     */
-  def * (other: Expression): Expression = unresolvedCall(TIMES, expr, other)
+  def * (other: Expression): Expression = times(other)
 
   /**
     * Returns the remainder (modulus) of left divided by right.
@@ -194,352 +141,23 @@ trait ImplicitExpressionOperations {
     *
     * e.g. withColumns(1 to 3)
     */
-  def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, other)
-
-  /**
-    * Similar to a SQL distinct aggregation clause such as COUNT(DISTINCT a), declares that an
-    * aggregation function is only applied on distinct input values.
-    *
-    * For example:
-    *
-    * {{{
-    * orders
-    *   .groupBy('a)
-    *   .select('a, 'b.sum.distinct as 'd)
-    * }}}
-    */
-  def distinct: Expression = unresolvedCall(DISTINCT, expr)
-
-  /**
-    * Returns the sum of the numeric field across all input values.
-    * If all values are null, null is returned.
-    */
-  def sum: Expression = unresolvedCall(SUM, expr)
-
-  /**
-    * Returns the sum of the numeric field across all input values.
-    * If all values are null, 0 is returned.
-    */
-  def sum0: Expression = unresolvedCall(SUM0, expr)
-
-  /**
-    * Returns the minimum value of field across all input values.
-    */
-  def min: Expression = unresolvedCall(MIN, expr)
-
-  /**
-    * Returns the maximum value of field across all input values.
-    */
-  def max: Expression = unresolvedCall(MAX, expr)
-
-  /**
-    * Returns the number of input rows for which the field is not null.
-    */
-  def count: Expression = unresolvedCall(COUNT, expr)
-
-  /**
-    * Returns the average (arithmetic mean) of the numeric field across all input values.
-    */
-  def avg: Expression = unresolvedCall(AVG, expr)
-
-  /**
-    * Returns the population standard deviation of an expression (the square root of varPop()).
-    */
-  def stddevPop: Expression = unresolvedCall(STDDEV_POP, expr)
-
-  /**
-    * Returns the sample standard deviation of an expression (the square root of varSamp()).
-    */
-  def stddevSamp: Expression = unresolvedCall(STDDEV_SAMP, expr)
-
-  /**
-    * Returns the population standard variance of an expression.
-    */
-  def varPop: Expression = unresolvedCall(VAR_POP, expr)
-
-  /**
-    *  Returns the sample variance of a given expression.
-    */
-  def varSamp: Expression = unresolvedCall(VAR_SAMP, expr)
-
-  /**
-    * Returns multiset aggregate of a given expression.
-    */
-  def collect: Expression = unresolvedCall(COLLECT, expr)
-
-  /**
-    * Converts a value to a given data type.
-    *
-    * e.g. "42".cast(DataTypes.INT()) leads to 42.
-    *
-    * @return casted expression
-    */
-  def cast(toType: DataType): Expression =
-    unresolvedCall(CAST, expr, typeLiteral(toType))
-
-  /**
-    * @deprecated This method will be removed in future versions as it uses the old type system. It
-    *             is recommended to use [[cast(DataType)]] instead which uses the new type system
-    *             based on [[DataTypes]]. Please make sure to use either the old or the new type
-    *             system consistently to avoid unintended behavior. See the website documentation
-    *             for more information.
-    */
-  @deprecated
-  def cast(toType: TypeInformation[_]): Expression =
-    unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(toType)))
-
-  /**
-    * Specifies a name for an expression i.e. a field.
-    *
-    * @param name name for one field
-    * @param extraNames additional names if the expression expands to multiple fields
-    * @return field with an alias
-    */
-  def as(name: Symbol, extraNames: Symbol*): Expression =
-    unresolvedCall(
-      AS,
-      expr +: valueLiteral(name.name) +: extraNames.map(name => valueLiteral(name.name)): _*)
-
-  /**
-    * Specifies ascending order of an expression i.e. a field for orderBy call.
-    *
-    * @return ascend expression
-    */
-  def asc: Expression = unresolvedCall(ORDER_ASC, expr)
-
-  /**
-    * Specifies descending order of an expression i.e. a field for orderBy call.
-    *
-    * @return descend expression
-    */
-  def desc: Expression = unresolvedCall(ORDER_DESC, expr)
-
-  /**
-    * Returns true if an expression exists in a given list of expressions. This is a shorthand
-    * for multiple OR conditions.
-    *
-    * If the testing set contains null, the result will be null if the element can not be found
-    * and true if it can be found. If the element is null, the result is always null.
-    *
-    * e.g. "42".in(1, 2, 3) leads to false.
-    */
-  def in(elements: Expression*): Expression = unresolvedCall(IN, expr +: elements: _*)
-
-  /**
-    * Returns true if an expression exists in a given table sub-query. The sub-query table
-    * must consist of one column. This column must have the same data type as the expression.
-    *
-    * Note: This operation is not supported in a streaming environment yet.
-    */
-  def in(table: Table): Expression = unresolvedCall(IN, expr, tableRef(table.toString, table))
-
-  /**
-    * Returns the start time (inclusive) of a window when applied on a window reference.
-    */
-  def start: Expression = unresolvedCall(WINDOW_START, expr)
-
-  /**
-    * Returns the end time (exclusive) of a window when applied on a window reference.
-    *
-    * e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000.
-    */
-  def end: Expression = unresolvedCall(WINDOW_END, expr)
+  def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, objectToExpression(other))
 
   /**
     * Ternary conditional operator that decides which of two other expressions should be
     * based on a evaluated boolean condition.
     *
-    * e.g. (42 > 5).?("A", "B") leads to "A"
+    * e.g. ($"f0" > 5).?("A", "B") leads to "A"
     *
     * @param ifTrue expression to be evaluated if condition holds
     * @param ifFalse expression to be evaluated if condition does not hold
     */
   def ?(ifTrue: Expression, ifFalse: Expression): Expression =
-    unresolvedCall(IF, expr, ifTrue, ifFalse)
+    Expressions.ifThenElse(expr, ifTrue, ifFalse).toExpr
 
   // scalar functions
 
   /**
-    * Calculates the remainder of division the given number by another one.
-    */
-  def mod(other: Expression): Expression = unresolvedCall(MOD, expr, other)
-
-  /**
-    * Calculates the Euler's number raised to the given power.
-    */
-  def exp(): Expression = unresolvedCall(EXP, expr)
-
-  /**
-    * Calculates the base 10 logarithm of the given value.
-    */
-  def log10(): Expression = unresolvedCall(LOG10, expr)
-
-  /**
-    * Calculates the base 2 logarithm of the given value.
-    */
-  def log2(): Expression = unresolvedCall(LOG2, expr)
-
-  /**
-    * Calculates the natural logarithm of the given value.
-    */
-  def ln(): Expression = unresolvedCall(LN, expr)
-
-  /**
-    * Calculates the natural logarithm of the given value.
-    */
-  def log(): Expression = unresolvedCall(LOG, expr)
-
-  /**
-    * Calculates the logarithm of the given value to the given base.
-    */
-  def log(base: Expression): Expression = unresolvedCall(LOG, base, expr)
-
-  /**
-    * Calculates the given number raised to the power of the other value.
-    */
-  def power(other: Expression): Expression = unresolvedCall(POWER, expr, other)
-
-  /**
-    * Calculates the hyperbolic cosine of a given value.
-    */
-  def cosh(): Expression = unresolvedCall(COSH, expr)
-
-  /**
-    * Calculates the square root of a given value.
-    */
-  def sqrt(): Expression = unresolvedCall(SQRT, expr)
-
-  /**
-    * Calculates the absolute value of given value.
-    */
-  def abs(): Expression = unresolvedCall(ABS, expr)
-
-  /**
-    * Calculates the largest integer less than or equal to a given number.
-    */
-  def floor(): Expression = unresolvedCall(FLOOR, expr)
-
-  /**
-    * Calculates the hyperbolic sine of a given value.
-    */
-  def sinh(): Expression = unresolvedCall(SINH, expr)
-
-  /**
-    * Calculates the smallest integer greater than or equal to a given number.
-    */
-  def ceil(): Expression = unresolvedCall(CEIL, expr)
-
-  /**
-    * Calculates the sine of a given number.
-    */
-  def sin(): Expression = unresolvedCall(SIN, expr)
-
-  /**
-    * Calculates the cosine of a given number.
-    */
-  def cos(): Expression = unresolvedCall(COS, expr)
-
-  /**
-    * Calculates the tangent of a given number.
-    */
-  def tan(): Expression = unresolvedCall(TAN, expr)
-
-  /**
-    * Calculates the cotangent of a given number.
-    */
-  def cot(): Expression = unresolvedCall(COT, expr)
-
-  /**
-    * Calculates the arc sine of a given number.
-    */
-  def asin(): Expression = unresolvedCall(ASIN, expr)
-
-  /**
-    * Calculates the arc cosine of a given number.
-    */
-  def acos(): Expression = unresolvedCall(ACOS, expr)
-
-  /**
-    * Calculates the arc tangent of a given number.
-    */
-  def atan(): Expression = unresolvedCall(ATAN, expr)
-
-  /**
-    * Calculates the hyperbolic tangent of a given number.
-    */
-  def tanh(): Expression = unresolvedCall(TANH, expr)
-
-  /**
-    * Converts numeric from radians to degrees.
-    */
-  def degrees(): Expression = unresolvedCall(DEGREES, expr)
-
-  /**
-    * Converts numeric from degrees to radians.
-    */
-  def radians(): Expression = unresolvedCall(RADIANS, expr)
-
-  /**
-    * Calculates the signum of a given number.
-    */
-  def sign(): Expression = unresolvedCall(SIGN, expr)
-
-  /**
-    * Rounds the given number to integer places right to the decimal point.
-    */
-  def round(places: Expression): Expression = unresolvedCall(ROUND, expr, places)
-
-  /**
-    * Returns a string representation of an integer numeric value in binary format. Returns null if
-    * numeric is null. E.g. "4" leads to "100", "12" leads to "1100".
-    */
-  def bin(): Expression = unresolvedCall(BIN, expr)
-
-  /**
-    * Returns a string representation of an integer numeric value or a string in hex format. Returns
-    * null if numeric or string is null.
-    *
-    * E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a string "hello,world" leads
-    * to "68656c6c6f2c776f726c64".
-    */
-  def hex(): Expression = unresolvedCall(HEX, expr)
-
-  /**
-    * Returns a number of truncated to n decimal places.
-    * If n is 0,the result has no decimal point or fractional part.
-    * n can be negative to cause n digits left of the decimal point of the value to become zero.
-    * E.g. truncate(42.345, 2) to 42.34.
-    */
-  def truncate(n: Expression): Expression = unresolvedCall(TRUNCATE, expr, n)
-
-  /**
-    * Returns a number of truncated to 0 decimal places.
-    * E.g. truncate(42.345) to 42.0.
-    */
-  def truncate(): Expression = unresolvedCall(TRUNCATE, expr)
-
-  // String operations
-
-  /**
-    * Creates a substring of the given string at given index for a given length.
-    *
-    * @param beginIndex first character of the substring (starting at 1, inclusive)
-    * @param length number of characters of the substring
-    * @return substring
-    */
-  def substring(beginIndex: Expression, length: Expression): Expression =
-    unresolvedCall(SUBSTRING, expr, beginIndex, length)
-
-  /**
-    * Creates a substring of the given string beginning at the given index to the end.
-    *
-    * @param beginIndex first character of the substring (starting at 1, inclusive)
-    * @return substring
-    */
-  def substring(beginIndex: Expression): Expression =
-    unresolvedCall(SUBSTRING, expr, beginIndex)
-
-  /**
     * Removes leading and/or trailing characters from the given string.
     *
     * @param removeLeading if true, remove leading characters (default: true)
@@ -552,324 +170,14 @@ trait ImplicitExpressionOperations {
       removeTrailing: Boolean = true,
       character: Expression = valueLiteral(" "))
     : Expression = {
-    unresolvedCall(TRIM, valueLiteral(removeLeading), valueLiteral(removeTrailing), character, expr)
+    unresolvedCall(
+      TRIM,
+      valueLiteral(removeLeading),
+      valueLiteral(removeTrailing),
+      ApiExpressionUtils.objectToExpression(character),
+      expr)
   }
 
-  /**
-    * Returns a new string which replaces all the occurrences of the search target
-    * with the replacement string (non-overlapping).
-    */
-  def replace(search: Expression, replacement: Expression): Expression =
-    unresolvedCall(REPLACE, expr, search, replacement)
-
-  /**
-    * Returns the length of a string.
-    */
-  def charLength(): Expression = unresolvedCall(CHAR_LENGTH, expr)
-
-  /**
-    * Returns all of the characters in a string in upper case using the rules of
-    * the default locale.
-    */
-  def upperCase(): Expression = unresolvedCall(UPPER, expr)
-
-  /**
-    * Returns all of the characters in a string in lower case using the rules of
-    * the default locale.
-    */
-  def lowerCase(): Expression = unresolvedCall(LOWER, expr)
-
-  /**
-    * Converts the initial letter of each word in a string to uppercase.
-    * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.
-    */
-  def initCap(): Expression = unresolvedCall(INIT_CAP, expr)
-
-  /**
-    * Returns true, if a string matches the specified LIKE pattern.
-    *
-    * e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n"
-    */
-  def like(pattern: Expression): Expression = unresolvedCall(LIKE, expr, pattern)
-
-  /**
-    * Returns true, if a string matches the specified SQL regex pattern.
-    *
-    * e.g. "A+" matches all strings that consist of at least one A
-    */
-  def similar(pattern: Expression): Expression = unresolvedCall(SIMILAR, expr, pattern)
-
-  /**
-    * Returns the position of string in an other string starting at 1.
-    * Returns 0 if string could not be found.
-    *
-    * e.g. "a".position("bbbbba") leads to 6
-    */
-  def position(haystack: Expression): Expression = unresolvedCall(POSITION, expr, haystack)
-
-  /**
-    * Returns a string left-padded with the given pad string to a length of len characters. If
-    * the string is longer than len, the return value is shortened to len characters.
-    *
-    * e.g. "hi".lpad(4, '??') returns "??hi",  "hi".lpad(1, '??') returns "h"
-    */
-  def lpad(len: Expression, pad: Expression): Expression = unresolvedCall(LPAD, expr, len, pad)
-
-  /**
-    * Returns a string right-padded with the given pad string to a length of len characters. If
-    * the string is longer than len, the return value is shortened to len characters.
-    *
-    * e.g. "hi".rpad(4, '??') returns "hi??",  "hi".rpad(1, '??') returns "h"
-    */
-  def rpad(len: Expression, pad: Expression): Expression = unresolvedCall(RPAD, expr, len, pad)
-
-  /**
-    * Defines an aggregation to be used for a previously specified over window.
-    *
-    * For example:
-    *
-    * {{{
-    * table
-    *   .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
-    *   .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
-    * }}}
-    */
-  def over(alias: Expression): Expression = unresolvedCall(OVER, expr, alias)
-
-  /**
-    * Replaces a substring of string with a string starting at a position (starting at 1).
-    *
-    * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx"
-    */
-  def overlay(newString: Expression, starting: Expression): Expression =
-    unresolvedCall(OVERLAY, expr, newString, starting)
-
-  /**
-    * Replaces a substring of string with a string starting at a position (starting at 1).
-    * The length specifies how many characters should be removed.
-    *
-    * e.g. "xxxxxtest".overlay("xxxx", 6, 2) leads to "xxxxxxxxxst"
-    */
-  def overlay(newString: Expression, starting: Expression, length: Expression): Expression =
-    unresolvedCall(OVERLAY, expr, newString, starting, length)
-
-  /**
-    * Returns a string with all substrings that match the regular expression consecutively
-    * being replaced.
-    */
-  def regexpReplace(regex: Expression, replacement: Expression): Expression =
-    unresolvedCall(REGEXP_REPLACE, expr, regex, replacement)
-
-  /**
-    * Returns a string extracted with a specified regular expression and a regex match group
-    * index.
-    */
-  def regexpExtract(regex: Expression, extractIndex: Expression): Expression =
-    unresolvedCall(REGEXP_EXTRACT, expr, regex, extractIndex)
-
-  /**
-    * Returns a string extracted with a specified regular expression.
-    */
-  def regexpExtract(regex: Expression): Expression =
-    unresolvedCall(REGEXP_EXTRACT, expr, regex)
-
-  /**
-    * Returns the base string decoded with base64.
-    */
-  def fromBase64(): Expression = unresolvedCall(FROM_BASE64, expr)
-
-  /**
-    * Returns the base64-encoded result of the input string.
-    */
-  def toBase64(): Expression = unresolvedCall(TO_BASE64, expr)
-
-  /**
-    * Returns a string that removes the left whitespaces from the given string.
-    */
-  def ltrim(): Expression = unresolvedCall(LTRIM, expr)
-
-  /**
-    * Returns a string that removes the right whitespaces from the given string.
-    */
-  def rtrim(): Expression = unresolvedCall(RTRIM, expr)
-
-  /**
-    * Returns a string that repeats the base string n times.
-    */
-  def repeat(n: Expression): Expression = unresolvedCall(REPEAT, expr, n)
-
-  // Temporal operations
-
-  /**
-    * Parses a date string in the form "yyyy-MM-dd" to a SQL Date.
-    */
-  def toDate: Expression =
-    unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE)))
-
-  /**
-    * Parses a time string in the form "HH:mm:ss" to a SQL Time.
-    */
-  def toTime: Expression =
-    unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME)))
-
-  /**
-    * Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a SQL Timestamp.
-    */
-  def toTimestamp: Expression =
-    unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP)))
-
-  /**
-    * Extracts parts of a time point or time interval. Returns the part as a long value.
-    *
-    * e.g. "2006-06-05".toDate.extract(DAY) leads to 5
-    */
-  def extract(timeIntervalUnit: TimeIntervalUnit): Expression =
-    unresolvedCall(EXTRACT, valueLiteral(timeIntervalUnit), expr)
-
-  /**
-    * Rounds down a time point to the given unit.
-    *
-    * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
-    */
-  def floor(timeIntervalUnit: TimeIntervalUnit): Expression =
-    unresolvedCall(FLOOR, valueLiteral(timeIntervalUnit), expr)
-
-  /**
-    * Rounds up a time point to the given unit.
-    *
-    * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00
-    */
-  def ceil(timeIntervalUnit: TimeIntervalUnit): Expression =
-    unresolvedCall(CEIL, valueLiteral(timeIntervalUnit), expr)
-
-  // Interval types
-
-  /**
-    * Creates an interval of the given number of years.
-    *
-    * @return interval of months
-    */
-  def year: Expression = toMonthInterval(expr, 12)
-
-  /**
-    * Creates an interval of the given number of years.
-    *
-    * @return interval of months
-    */
-  def years: Expression = year
-
-  /**
-    * Creates an interval of the given number of quarters.
-    *
-    * @return interval of months
-    */
-  def quarter: Expression = toMonthInterval(expr, 3)
-
-  /**
-    * Creates an interval of the given number of quarters.
-    *
-    * @return interval of months
-    */
-  def quarters: Expression = quarter
-
-  /**
-    * Creates an interval of the given number of months.
-    *
-    * @return interval of months
-    */
-  def month: Expression = toMonthInterval(expr, 1)
-
-  /**
-    * Creates an interval of the given number of months.
-    *
-    * @return interval of months
-    */
-  def months: Expression = month
-
-  /**
-    * Creates an interval of the given number of weeks.
-    *
-    * @return interval of milliseconds
-    */
-  def week: Expression = toMilliInterval(expr, 7 * MILLIS_PER_DAY)
-
-  /**
-    * Creates an interval of the given number of weeks.
-    *
-    * @return interval of milliseconds
-    */
-  def weeks: Expression = week
-
-  /**
-    * Creates an interval of the given number of days.
-    *
-    * @return interval of milliseconds
-    */
-  def day: Expression = toMilliInterval(expr, MILLIS_PER_DAY)
-
-  /**
-    * Creates an interval of the given number of days.
-    *
-    * @return interval of milliseconds
-    */
-  def days: Expression = day
-
-  /**
-    * Creates an interval of the given number of hours.
-    *
-    * @return interval of milliseconds
-    */
-  def hour: Expression = toMilliInterval(expr, MILLIS_PER_HOUR)
-
-  /**
-    * Creates an interval of the given number of hours.
-    *
-    * @return interval of milliseconds
-    */
-  def hours: Expression = hour
-
-  /**
-    * Creates an interval of the given number of minutes.
-    *
-    * @return interval of milliseconds
-    */
-  def minute: Expression = toMilliInterval(expr, MILLIS_PER_MINUTE)
-
-  /**
-    * Creates an interval of the given number of minutes.
-    *
-    * @return interval of milliseconds
-    */
-  def minutes: Expression = minute
-
-  /**
-    * Creates an interval of the given number of seconds.
-    *
-    * @return interval of milliseconds
-    */
-  def second: Expression = toMilliInterval(expr, MILLIS_PER_SECOND)
-
-  /**
-    * Creates an interval of the given number of seconds.
-    *
-    * @return interval of milliseconds
-    */
-  def seconds: Expression = second
-
-  /**
-    * Creates an interval of the given number of milliseconds.
-    *
-    * @return interval of milliseconds
-    */
-  def milli: Expression = toMilliInterval(expr, 1)
-
-  /**
-    * Creates an interval of the given number of milliseconds.
-    *
-    * @return interval of milliseconds
-    */
-  def millis: Expression = milli
-
   // Row interval type
 
   /**
@@ -879,121 +187,6 @@ trait ImplicitExpressionOperations {
     */
   def rows: Expression = toRowInterval(expr)
 
-  // Advanced type helper functions
-
-  /**
-    * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
-    * returns it's value.
-    *
-    * @param name name of the field (similar to Flink's field expressions)
-    * @return value of the field
-    */
-  def get(name: String): Expression = unresolvedCall(GET, expr, valueLiteral(name))
-
-  /**
-    * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
-    * returns it's value.
-    *
-    * @param index position of the field
-    * @return value of the field
-    */
-  def get(index: Int): Expression = unresolvedCall(GET, expr, valueLiteral(index))
-
-  /**
-    * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
-    * into a flat representation where every subtype is a separate field.
-    */
-  def flatten(): Expression = unresolvedCall(FLATTEN, expr)
-
-  /**
-    * Accesses the element of an array or map based on a key or an index (starting at 1).
-    *
-    * @param index key or position of the element (array index starting at 1)
-    * @return value of the element
-    */
-  def at(index: Expression): Expression = unresolvedCall(AT, expr, index)
-
-  /**
-    * Returns the number of elements of an array or number of entries of a map.
-    *
-    * @return number of elements or entries
-    */
-  def cardinality(): Expression = unresolvedCall(CARDINALITY, expr)
-
-  /**
-    * Returns the sole element of an array with a single element. Returns null if the array is
-    * empty. Throws an exception if the array has more than one element.
-    *
-    * @return the first and only element of an array with a single element
-    */
-  def element(): Expression = unresolvedCall(ARRAY_ELEMENT, expr)
-
-  // Time definition
-
-  /**
-    * Declares a field as the rowtime attribute for indicating, accessing, and working in
-    * Flink's event time.
-    */
-  def rowtime: Expression = unresolvedCall(ROWTIME, expr)
-
-  /**
-    * Declares a field as the proctime attribute for indicating, accessing, and working in
-    * Flink's processing time.
-    */
-  def proctime: Expression = unresolvedCall(PROCTIME, expr)
-
-  // Hash functions
-
-  /**
-    * Returns the MD5 hash of the string argument; null if string is null.
-    *
-    * @return string of 32 hexadecimal digits or null
-    */
-  def md5(): Expression = unresolvedCall(MD5, expr)
-
-  /**
-    * Returns the SHA-1 hash of the string argument; null if string is null.
-    *
-    * @return string of 40 hexadecimal digits or null
-    */
-  def sha1(): Expression = unresolvedCall(SHA1, expr)
-
-  /**
-    * Returns the SHA-224 hash of the string argument; null if string is null.
-    *
-    * @return string of 56 hexadecimal digits or null
-    */
-  def sha224(): Expression = unresolvedCall(SHA224, expr)
-
-  /**
-    * Returns the SHA-256 hash of the string argument; null if string is null.
-    *
-    * @return string of 64 hexadecimal digits or null
-    */
-  def sha256(): Expression = unresolvedCall(SHA256, expr)
-
-  /**
-    * Returns the SHA-384 hash of the string argument; null if string is null.
-    *
-    * @return string of 96 hexadecimal digits or null
-    */
-  def sha384(): Expression = unresolvedCall(SHA384, expr)
-
-  /**
-    * Returns the SHA-512 hash of the string argument; null if string is null.
-    *
-    * @return string of 128 hexadecimal digits or null
-    */
-  def sha512(): Expression = unresolvedCall(SHA512, expr)
-
-  /**
-    * Returns the hash for the given string expression using the SHA-2 family of hash
-    * functions (SHA-224, SHA-256, SHA-384, or SHA-512).
-    *
-    * @param hashLength bit length of the result (either 224, 256, 384, or 512)
-    * @return string or null if one of the arguments is null.
-    */
-  def sha2(hashLength: Expression): Expression = unresolvedCall(SHA2, expr, hashLength)
 }
 
 /**
@@ -1109,7 +302,9 @@ trait ImplicitExpressionConversions {
       * Calls a scalar function for the given parameters.
       */
     def apply(params: Expression*): Expression = {
-      unresolvedCall(new ScalarFunctionDefinition(s.getClass.getName, s), params:_*)
+      unresolvedCall(
+        new ScalarFunctionDefinition(s.getClass.getName, s),
+        params.map(ApiExpressionUtils.objectToExpression): _*)
     }
   }
 
@@ -1121,7 +316,9 @@ trait ImplicitExpressionConversions {
     def apply(params: Expression*): Expression = {
       val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper
         .getReturnTypeOfTableFunction(t, implicitly[TypeInformation[T]])
-      unresolvedCall(new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo), params: _*)
+      unresolvedCall(
+        new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo),
+        params.map(ApiExpressionUtils.objectToExpression): _*)
     }
   }
 
@@ -1149,7 +346,9 @@ trait ImplicitExpressionConversions {
       * Calls an aggregate function for the given parameters.
       */
     def apply(params: Expression*): Expression = {
-      unresolvedCall(createFunctionDefinition(), params: _*)
+      unresolvedCall(
+        createFunctionDefinition(),
+        params.map(ApiExpressionUtils.objectToExpression): _*)
     }
 
     /**
@@ -1160,6 +359,25 @@ trait ImplicitExpressionConversions {
     }
   }
 
+
+  /**
+   * Extends Scala's StringContext with a method for creating an unresolved reference via
+   * string interpolation.
+   */
+  implicit class FieldExpression(val sc: StringContext) {
+
+    /**
+     * Creates an unresolved reference to a table's field.
+     *
+     * Example:
+     * ```
+     * tab.select($"key", $"value")
+     * ```
+     * </pre>
+     */
+    def $(args: Any*): Expression = unresolvedRef(sc.s(args: _*))
+  }
+
   implicit def tableSymbolToExpression(sym: TableSymbol): Expression =
     valueLiteral(sym)
 
@@ -1169,7 +387,7 @@ trait ImplicitExpressionConversions {
   implicit def scalaRange2RangeExpression(range: Range.Inclusive): Expression = {
     val startExpression = valueLiteral(range.start)
     val endExpression = valueLiteral(range.end)
-    startExpression to endExpression
+    unresolvedCall(RANGE_TO, startExpression, endExpression)
   }
 
   implicit def byte2Literal(b: Byte): Expression = valueLiteral(b)
@@ -1284,47 +502,77 @@ trait ImplicitExpressionConversions {
    * @see TableEnvironment#createTemporaryFunction
    * @see TableEnvironment#createTemporarySystemFunction
    */
-  def call(path: String, params: Expression*): Expression = {
-    lookupCall(path, params: _*)
-  }
+  def call(path: String, params: Expression*): Expression = Expressions.call(path, params: _*)
 
   // ----------------------------------------------------------------------------------------------
   // Implicit expressions in prefix notation
   // ----------------------------------------------------------------------------------------------
 
   /**
+   * Creates a SQL literal.
+   *
+   * The data type is derived from the object's class and its value.
+   *
+   * For example:
+   *
+   *  - `lit(12)`` leads to `INT`
+   *  - `lit("abc")`` leads to `CHAR(3)`
+   *  - `lit(new java.math.BigDecimal("123.45"))` leads to `DECIMAL(5, 2)`
+   *
+   * See [[org.apache.flink.table.types.utils.ValueDataTypeConverter]] for a list of supported
+   * literal values.
+   */
+  def lit(v: Any): Expression = Expressions.lit(v)
+
+  /**
+   * Creates a SQL literal of a given [[DataType]].
+   *
+   * The method [[lit(Object)]] is preferred as it extracts the [[DataType]]
+   * automatically. The class of `v` must be supported according to the
+   * [[org.apache.flink.table.types.logical.LogicalType#supportsInputConversion(Class)]].
+   */
+  def lit(v: Any, dataType: DataType): Expression = Expressions.lit(v, dataType)
+
+  /**
+   * Returns negative numeric.
+   */
+  def negative(v: Expression): Expression = {
+    Expressions.negative(v)
+  }
+
+  /**
     * Returns the current SQL date in UTC time zone.
     */
   def currentDate(): Expression = {
-    unresolvedCall(CURRENT_DATE)
+    Expressions.currentDate()
   }
 
   /**
     * Returns the current SQL time in UTC time zone.
     */
   def currentTime(): Expression = {
-    unresolvedCall(CURRENT_TIME)
+    Expressions.currentTime()
   }
 
   /**
     * Returns the current SQL timestamp in UTC time zone.
     */
   def currentTimestamp(): Expression = {
-    unresolvedCall(CURRENT_TIMESTAMP)
+    Expressions.currentTimestamp()
   }
 
   /**
     * Returns the current SQL time in local time zone.
     */
   def localTime(): Expression = {
-    unresolvedCall(LOCAL_TIME)
+    Expressions.localTime()
   }
 
   /**
     * Returns the current SQL timestamp in local time zone.
     */
   def localTimestamp(): Expression = {
-    unresolvedCall(LOCAL_TIMESTAMP)
+    Expressions.localTimestamp()
   }
 
   /**
@@ -1342,7 +590,7 @@ trait ImplicitExpressionConversions {
       rightTimePoint: Expression,
       rightTemporal: Expression)
     : Expression = {
-    unresolvedCall(TEMPORAL_OVERLAPS, leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
+    Expressions.temporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
   }
 
   /**
@@ -1360,7 +608,7 @@ trait ImplicitExpressionConversions {
       timestamp: Expression,
       format: Expression)
     : Expression = {
-    unresolvedCall(DATE_FORMAT, timestamp, format)
+    Expressions.dateFormat(timestamp, format)
   }
 
   /**
@@ -1379,49 +627,49 @@ trait ImplicitExpressionConversions {
       timePoint1: Expression,
       timePoint2: Expression)
     : Expression = {
-    unresolvedCall(TIMESTAMP_DIFF, timePointUnit, timePoint1, timePoint2)
+    Expressions.timestampDiff(timePointUnit, timePoint1, timePoint2)
   }
 
   /**
     * Creates an array of literals.
     */
   def array(head: Expression, tail: Expression*): Expression = {
-    unresolvedCall(ARRAY, head +: tail: _*)
+    Expressions.array(head, tail: _*)
   }
 
   /**
     * Creates a row of expressions.
     */
   def row(head: Expression, tail: Expression*): Expression = {
-    unresolvedCall(ROW, head +: tail: _*)
+    Expressions.row(head, tail: _*)
   }
 
   /**
     * Creates a map of expressions.
     */
   def map(key: Expression, value: Expression, tail: Expression*): Expression = {
-    unresolvedCall(MAP, key +: value +: tail: _*)
+    Expressions.map(key, value, tail: _*)
   }
 
   /**
     * Returns a value that is closer than any other value to pi.
     */
   def pi(): Expression = {
-    unresolvedCall(PI)
+    Expressions.pi()
   }
 
   /**
     * Returns a value that is closer than any other value to e.
     */
   def e(): Expression = {
-    unresolvedCall(E)
+    Expressions.e()
   }
 
   /**
     * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
     */
   def rand(): Expression = {
-    unresolvedCall(RAND)
+    Expressions.rand()
   }
 
   /**
@@ -1430,7 +678,7 @@ trait ImplicitExpressionConversions {
     * have same initial seed.
     */
   def rand(seed: Expression): Expression = {
-    unresolvedCall(RAND, seed)
+    Expressions.rand(seed)
   }
 
   /**
@@ -1438,7 +686,7 @@ trait ImplicitExpressionConversions {
     * value (exclusive).
     */
   def randInteger(bound: Expression): Expression = {
-    unresolvedCall(RAND_INTEGER, bound)
+    Expressions.randInteger(bound)
   }
 
   /**
@@ -1447,7 +695,7 @@ trait ImplicitExpressionConversions {
     * of numbers if they have same initial seed and same bound.
     */
   def randInteger(seed: Expression, bound: Expression): Expression = {
-    unresolvedCall(RAND_INTEGER, seed, bound)
+    Expressions.randInteger(seed, bound)
   }
 
   /**
@@ -1455,25 +703,38 @@ trait ImplicitExpressionConversions {
     * Returns NULL if any argument is NULL.
     */
   def concat(string: Expression, strings: Expression*): Expression = {
-    unresolvedCall(CONCAT, string +: strings: _*)
+    Expressions.concat(string, strings: _*)
   }
 
   /**
     * Calculates the arc tangent of a given coordinate.
     */
   def atan2(y: Expression, x: Expression): Expression = {
-    unresolvedCall(ATAN2, y, x)
+    Expressions.atan2(y, x)
   }
 
   /**
     * Returns the string that results from concatenating the arguments and separator.
     * Returns NULL If the separator is NULL.
     *
-    * Note: this user-defined function does not skip empty strings. However, it does skip any NULL
+    * Note: This function does not skip empty strings. However, it does skip any NULL
     * values after the separator argument.
+    * @deprecated use [[ImplicitExpressionConversions.concatWs()]]
     **/
+  @deprecated
   def concat_ws(separator: Expression, string: Expression, strings: Expression*): Expression = {
-    unresolvedCall(CONCAT_WS, separator +: string +: strings: _*)
+    concatWs(separator, string, strings: _*)
+  }
+
+  /**
+   * Returns the string that results from concatenating the arguments and separator.
+   * Returns NULL If the separator is NULL.
+   *
+   * Note: this user-defined function does not skip empty strings. However, it does skip any NULL
+   * values after the separator argument.
+   **/
+  def concatWs(separator: Expression, string: Expression, strings: Expression*): Expression = {
+    Expressions.concatWs(separator, string, strings: _*)
   }
 
   /**
@@ -1483,7 +744,7 @@ trait ImplicitExpressionConversions {
     * generator.
     */
   def uuid(): Expression = {
-    unresolvedCall(UUID)
+    Expressions.uuid()
   }
 
   /**
@@ -1492,7 +753,7 @@ trait ImplicitExpressionConversions {
     * e.g. nullOf(DataTypes.INT())
     */
   def nullOf(dataType: DataType): Expression = {
-    valueLiteral(null, dataType)
+    Expressions.nullOf(dataType)
   }
 
   /**
@@ -1503,21 +764,21 @@ trait ImplicitExpressionConversions {
     *             documentation for more information.
     */
   def nullOf(typeInfo: TypeInformation[_]): Expression = {
-    nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo))
+    Expressions.nullOf(typeInfo)
   }
 
   /**
     * Calculates the logarithm of the given value.
     */
   def log(value: Expression): Expression = {
-    unresolvedCall(LOG, value)
+    Expressions.log(value)
   }
 
   /**
     * Calculates the logarithm of the given value to the given base.
     */
   def log(base: Expression, value: Expression): Expression = {
-    unresolvedCall(LOG, base, value)
+    Expressions.log(base, value)
   }
 
   /**
@@ -1531,7 +792,7 @@ trait ImplicitExpressionConversions {
     * @param ifFalse expression to be evaluated if condition does not hold
     */
   def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression = {
-    unresolvedCall(IF, condition, ifTrue, ifFalse)
+    Expressions.ifThenElse(condition, ifTrue, ifFalse)
   }
 
   /**
@@ -1544,7 +805,7 @@ trait ImplicitExpressionConversions {
     * e.g. withColumns('b to 'c) or withColumns('*)
     */
   def withColumns(head: Expression, tail: Expression*): Expression = {
-    unresolvedCall(WITH_COLUMNS, head +: tail: _*)
+    Expressions.withColumns(head, tail: _*)
   }
 
   /**
@@ -1558,6 +819,20 @@ trait ImplicitExpressionConversions {
     * e.g. withoutColumns('b to 'c) or withoutColumns('c)
     */
   def withoutColumns(head: Expression, tail: Expression*): Expression = {
-    unresolvedCall(WITHOUT_COLUMNS, head +: tail: _*)
+    Expressions.withoutColumns(head, tail: _*)
+  }
+
+  /**
+   * Boolean AND in three-valued logic.
+   */
+  def and(predicate0: Expression, predicate1: Expression, predicates: Expression*): Expression = {
+    Expressions.and(predicate0, predicate1, predicates: _*)
+  }
+
+  /**
+   * Boolean OR in three-valued logic.
+   */
+  def or(predicate0: Expression, predicate1: Expression, predicates: Expression*): Expression = {
+    Expressions.or(predicate0, predicate1, predicates: _*)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 06a9b5f..fadf854 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.functions._
 import org.apache.flink.table.planner.expressions.{E => PlannerE, UUID => PlannerUUID}
 import org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THROW_EXCEPTION
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE}
+import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL}
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
 
 import _root_.scala.collection.JavaConverters._
@@ -151,12 +151,12 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
             expr
 
           case AND =>
-            assert(args.size == 2)
-            And(args.head, args.last)
+            assert(args.size >= 2)
+            args.reduceLeft(And)
 
           case OR =>
-            assert(args.size == 2)
-            Or(args.head, args.last)
+            assert(args.size >= 2)
+            args.reduceLeft(Or)
 
           case NOT =>
             assert(args.size == 1)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index cb9f6b1..7b2c999 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -133,12 +133,12 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
             expr
 
           case AND =>
-            assert(args.size == 2)
-            And(args.head, args.last)
+            assert(args.size >= 2)
+            args.reduceLeft(And)
 
           case OR =>
-            assert(args.size == 2)
-            Or(args.head, args.last)
+            assert(args.size >= 2)
+            args.reduceLeft(Or)
 
           case NOT =>
             assert(args.size == 1)