You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/17 10:34:07 UTC

[GitHub] [flink] dawidwys commented on a change in pull request #11081: [FLINK-16033][table-api] Introduced Java Table API Expression DSL

dawidwys commented on a change in pull request #11081: [FLINK-16033][table-api] Introduced Java Table API Expression DSL
URL: https://github.com/apache/flink/pull/11081#discussion_r380103016
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
 ##########
 @@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.internal.BaseExpressions;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.TimePointUnit;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.objectToExpression;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
+
+/**
+ * Entry point for a Java Table Expression DSL.
+ */
+@PublicEvolving
+public final class Expressions {
+
+	/**
+	 * Creates an unresolved reference to a table's column.
+	 */
+	//CHECKSTYLE.OFF: MethodName
+	public static ApiExpression $(String name) {
+		return new ApiExpression(ApiExpressionUtils.unresolvedRef(name));
+	}
+	//CHECKSTYLE.ON: MethodName
+
+	/**
+	 * Creates a SQL literal. The data type is derived from the Java class.
+	 */
+	public static ApiExpression lit(Object v) {
+		return new ApiExpression(valueLiteral(v));
+	}
+
+	/**
+	 * Creates a SQL literal of a given {@link DataType}.
+	 */
+	public static ApiExpression lit(Object v, DataType dataType) {
+		return new ApiExpression(valueLiteral(v, dataType));
+	}
+
+	/**
+	 * Indicates a range from 'start' to 'end', which can be used in columns
+	 * selection.
+	 *
+	 * @see #withColumns(Object, Object...)
+	 * @see #withoutColumns(Object, Object...)
+	 */
+	public static ApiExpression range(String start, String end) {
+		return apiCall(BuiltInFunctionDefinitions.RANGE_TO, valueLiteral(start), valueLiteral(end));
+	}
+
+	/**
+	 * Indicates an index based range, which can be used in columns selection.
+	 *
+	 * @see #withColumns(Object, Object...)
+	 * @see #withoutColumns(Object, Object...)
+	 */
+	public static ApiExpression range(int start, int end) {
+		return apiCall(BuiltInFunctionDefinitions.RANGE_TO, valueLiteral(start), valueLiteral(end));
+	}
+
+	/**
+	 * Boolean AND in three-valued logic.
+	 */
+	public static ApiExpression and(Object predicate0, Object predicate1, Object... predicates) {
+		return apiCall(BuiltInFunctionDefinitions.AND, Stream.concat(
+			Stream.of(predicate0, predicate1),
+			Stream.of(predicates)
+		).map(ApiExpressionUtils::objectToExpression)
+			.toArray(Expression[]::new));
+	}
+
+	/**
+	 * Boolean OR in three-valued logic.
+	 */
+	public static ApiExpression or(Object predicate0, Object predicate1, Object... predicates) {
+		return apiCall(BuiltInFunctionDefinitions.OR, Stream.concat(
+			Stream.of(predicate0, predicate1),
+			Stream.of(predicates)
+		).map(ApiExpressionUtils::objectToExpression)
+			.toArray(Expression[]::new));
+	}
+
+	/**
+	 * Offset constant to be used in the {@code preceding} clause of unbounded {@code Over} windows. Use this
+	 * constant for a time interval. Unbounded over windows start with the first row of a partition.
+	 */
+	public static final ApiExpression UNBOUNDED_ROW = apiCall(BuiltInFunctionDefinitions.UNBOUNDED_ROW);
+
+	/**
+	 * Offset constant to be used in the {@code preceding} clause of unbounded {@link Over} windows. Use this
+	 * constant for a row-count interval. Unbounded over windows start with the first row of a
+	 * partition.
+	 */
+	public static final ApiExpression UNBOUNDED_RANGE = apiCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE);
+
+	/**
+	 * Offset constant to be used in the {@code following} clause of {@link Over} windows. Use this for setting
+	 * the upper bound of the window to the current row.
+	 */
+	public static final ApiExpression CURRENT_ROW = apiCall(BuiltInFunctionDefinitions.CURRENT_ROW);
+
+	/**
+	 * Offset constant to be used in the {@code following} clause of {@link Over} windows. Use this for setting
+	 * the upper bound of the window to the sort key of the current row, i.e., all rows with the same
+	 * sort key as the current row are included in the window.
+	 */
+	public static final ApiExpression CURRENT_RANGE = apiCall(BuiltInFunctionDefinitions.CURRENT_RANGE);
+
+	/**
+	 * Returns the current SQL date in UTC time zone.
+	 */
+	public static ApiExpression currentDate() {
+		return apiCall(BuiltInFunctionDefinitions.CURRENT_DATE);
+	}
+
+	/**
+	 * Returns the current SQL time in UTC time zone.
+	 */
+	public static ApiExpression currentTime() {
+		return apiCall(BuiltInFunctionDefinitions.CURRENT_TIME);
+	}
+
+	/**
+	 * Returns the current SQL timestamp in UTC time zone.
+	 */
+	public static ApiExpression currentTimestamp() {
+		return apiCall(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP);
+	}
+
+	/**
+	 * Returns the current SQL time in local time zone.
+	 */
+	public static ApiExpression localTime() {
+		return apiCall(BuiltInFunctionDefinitions.LOCAL_TIME);
+	}
+
+	/**
+	 * Returns the current SQL timestamp in local time zone.
+	 */
+	public static ApiExpression localTimestamp() {
+		return apiCall(BuiltInFunctionDefinitions.LOCAL_TIMESTAMP);
+	}
+
+	/**
+	 * Determines whether two anchored time intervals overlap. Time point and temporal are
+	 * transformed into a range defined by two time points (start, end). The function
+	 * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
+	 *
+	 * <p>It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
+	 *
+	 * <p>e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true
+	 */
+	public static ApiExpression temporalOverlaps(
+			Object leftTimePoint,
+			Object leftTemporal,
+			Object rightTimePoint,
+			Object rightTemporal) {
+		return apiCall(
+			BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS,
+			objectToExpression(leftTimePoint),
+			objectToExpression(leftTemporal),
+			objectToExpression(rightTimePoint),
+			objectToExpression(rightTemporal));
+	}
+
+	/**
+	 * Formats a timestamp as a string using a specified format.
+	 * The format must be compatible with MySQL's date formatting syntax as used by the
+	 * date_parse function.
+	 *
+	 * <p>For example dataFormat('time, "%Y, %d %M") results in strings formatted as "2017, 05 May".
+	 *
+	 * @param timestamp The timestamp to format as string.
+	 * @param format The format of the string.
+	 * @return The formatted timestamp as string.
+	 */
+	public static ApiExpression dateFormat(
+			Object timestamp,
+			Object format) {
+		return apiCall(
+			BuiltInFunctionDefinitions.DATE_FORMAT,
+			objectToExpression(timestamp),
+			objectToExpression(format));
+	}
+
+	/**
+	 * Returns the (signed) number of {@link TimePointUnit} between timePoint1 and timePoint2.
+	 *
+	 * <p>For example, {@code timestampDiff(TimePointUnit.DAY, '2016-06-15'.toDate, '2016-06-18'.toDate} leads
+	 * to 3.
+	 *
+	 * @param timePointUnit The unit to compute diff.
+	 * @param timePoint1 The first point in time.
+	 * @param timePoint2 The second point in time.
+	 * @return The number of intervals as integer value.
+	 */
+	public static ApiExpression timestampDiff(
+			TimePointUnit timePointUnit,
+			Object timePoint1,
+			Object timePoint2) {
+		return apiCall(
+			BuiltInFunctionDefinitions.TIMESTAMP_DIFF,
+			valueLiteral(timePointUnit),
+			objectToExpression(timePoint1),
+			objectToExpression(timePoint2));
+	}
+
+	/**
+	 * Creates an array of literals.
+	 */
+	public static ApiExpression array(Object head, Object... tail) {
+		return apiCall(
+			BuiltInFunctionDefinitions.ARRAY,
+			Stream.concat(
+				Stream.of(head),
+				Stream.of(tail)
+			).map(ApiExpressionUtils::objectToExpression)
+				.toArray(Expression[]::new));
+	}
+
+	/**
+	 * Creates a row of expressions.
+	 */
+	public static ApiExpression row(Object head, Object... tail) {
+		return apiCall(BuiltInFunctionDefinitions.ROW, Stream.concat(
+			Stream.of(head),
+			Stream.of(tail)
+		).map(ApiExpressionUtils::objectToExpression)
+			.toArray(Expression[]::new));
+	}
+
+	/**
+	 * Creates a map of expressions.
+	 */
+	public static ApiExpression map(Object key, Object value, Object... tail) {
+		return apiCall(BuiltInFunctionDefinitions.MAP, Stream.concat(
+			Stream.of(key, value),
+			Stream.of(tail)
+		).map(ApiExpressionUtils::objectToExpression)
+			.toArray(Expression[]::new));
+	}
+
+	/**
+	 * Creates an interval of rows.
+	 *
+	 * @see Table#window(GroupWindow)
+	 * @see Table#window(OverWindow...)
+	 */
+	public static ApiExpression rowInterval(Long rows) {
+		return new ApiExpression(valueLiteral(rows));
+	}
+
+	/**
+	 * Returns a value that is closer than any other value to pi.
+	 */
+	public static ApiExpression pi() {
 
 Review comment:
   I think the reason why it is not a constant is that it is usually a function in other RDBMS:
   https://docs.oracle.com/cd/E17952_01/mysql-5.0-en/mathematical-functions.html
   https://docs.microsoft.com/en-us/sql/t-sql/functions/pi-transact-sql?view=sql-server-ver15
   
   Whereas the `UNBOUNDED_RANGE` are really symbols/part of the over clause syntax.
   As far as I can tell, the intention was to be as close to the sql as possible.
   
   So that we have:
   ```
   table.select(pi()) 
   //but
   table.window(Over.orderBy($("h")).preceding(CURRENT_RANGE).following(UNBOUNDED_RANGE).as("w")).select(...)
   ```
   
   The same as in SQL:
   ```
   SELECT PI() FROM table;
   //but
   SELECT ... OVER (order by h preceding CURRENT_RANGE following UNBOUNDED_RANGE) AS w FROM table
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services