You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/02 11:11:03 UTC

[flink] 03/05: [FLINK-13028][table-api-java] Refactor local over windows

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8c1968ac2880c3784c6c35ffb819932483208807
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 1 09:12:41 2019 +0200

    [FLINK-13028][table-api-java] Refactor local over windows
---
 .../flink/table/expressions/LocalOverWindow.java   | 76 ++++++++++++++++++++++
 .../table/expressions/ExpressionResolver.java      | 21 +++---
 .../expressions/rules/OverWindowResolverRule.java  | 16 ++---
 .../table/expressions/rules/ResolverRule.java      |  6 +-
 .../flink/table/plan/logical/groupWindows.scala    | 15 +----
 5 files changed, 98 insertions(+), 36 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
new file mode 100644
index 0000000..45e1a7a
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Local over window created during expression resolution.
+ */
+@Internal
+public final class LocalOverWindow {
+
+	private Expression alias;
+
+	private List<Expression> partitionBy;
+
+	private Expression orderBy;
+
+	private Expression preceding;
+
+	private @Nullable Expression following;
+
+	LocalOverWindow(
+			Expression alias,
+			List<Expression> partitionBy,
+			Expression orderBy,
+			Expression preceding,
+			@Nullable Expression following) {
+		this.alias = alias;
+		this.partitionBy = partitionBy;
+		this.orderBy = orderBy;
+		this.preceding = preceding;
+		this.following = following;
+	}
+
+	public Expression getAlias() {
+		return alias;
+	}
+
+	public List<Expression> getPartitionBy() {
+		return partitionBy;
+	}
+
+	public Expression getOrderBy() {
+		return orderBy;
+	}
+
+	public Expression getPreceding() {
+		return preceding;
+	}
+
+	public Optional<Expression> getFollowing() {
+		return Optional.ofNullable(following);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
index 015751c..ea45f33 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.expressions.rules.ResolverRules;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
@@ -100,13 +99,13 @@ public class ExpressionResolver {
 
 	private final Map<String, LocalReferenceExpression> localReferences;
 
-	private final Map<Expression, LogicalOverWindow> overWindows;
+	private final Map<Expression, LocalOverWindow> localOverWindows;
 
 	private ExpressionResolver(
 			TableReferenceLookup tableLookup,
 			FunctionLookup functionLookup,
 			FieldReferenceLookup fieldLookup,
-			List<OverWindow> overWindows,
+			List<OverWindow> localOverWindows,
 			List<LocalReferenceExpression> localReferences) {
 		this.tableLookup = Preconditions.checkNotNull(tableLookup);
 		this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
@@ -116,7 +115,7 @@ public class ExpressionResolver {
 			LocalReferenceExpression::getName,
 			Function.identity()
 		));
-		this.overWindows = prepareOverWindows(overWindows);
+		this.localOverWindows = prepareOverWindows(localOverWindows);
 	}
 
 	/**
@@ -187,11 +186,11 @@ public class ExpressionResolver {
 			);
 	}
 
-	private Map<Expression, LogicalOverWindow> prepareOverWindows(List<OverWindow> overWindows) {
+	private Map<Expression, LocalOverWindow> prepareOverWindows(List<OverWindow> overWindows) {
 		return overWindows.stream()
 			.map(this::resolveOverWindow)
 			.collect(Collectors.toMap(
-				LogicalOverWindow::alias,
+				LocalOverWindow::getAlias,
 				Function.identity()
 			));
 	}
@@ -262,18 +261,18 @@ public class ExpressionResolver {
 		}
 
 		@Override
-		public Optional<LogicalOverWindow> getOverWindow(Expression alias) {
-			return Optional.ofNullable(overWindows.get(alias));
+		public Optional<LocalOverWindow> getOverWindow(Expression alias) {
+			return Optional.ofNullable(localOverWindows.get(alias));
 		}
 	}
 
-	private LogicalOverWindow resolveOverWindow(OverWindow overWindow) {
-		return new LogicalOverWindow(
+	private LocalOverWindow resolveOverWindow(OverWindow overWindow) {
+		return new LocalOverWindow(
 			overWindow.getAlias(),
 			prepareExpressions(overWindow.getPartitioning()),
 			resolveFieldsInSingleExpression(overWindow.getOrder()),
 			resolveFieldsInSingleExpression(overWindow.getPreceding()),
-			overWindow.getFollowing().map(this::resolveFieldsInSingleExpression)
+			overWindow.getFollowing().map(this::resolveFieldsInSingleExpression).orElse(null)
 		);
 	}
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
index fcec1ac..3420437 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.LocalOverWindow;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.ArrayList;
@@ -68,17 +68,17 @@ final class OverWindowResolverRule implements ResolverRule {
 				List<Expression> children = unresolvedCall.getChildren();
 				Expression alias = children.get(1);
 
-				LogicalOverWindow referenceWindow = resolutionContext.getOverWindow(alias)
+				LocalOverWindow referenceWindow = resolutionContext.getOverWindow(alias)
 					.orElseThrow(() -> new ValidationException("Could not resolve over call."));
 
 				Expression following = calculateOverWindowFollowing(referenceWindow);
 				List<Expression> newArgs = new ArrayList<>(asList(
 					children.get(0),
-					referenceWindow.orderBy(),
-					referenceWindow.preceding(),
+					referenceWindow.getOrderBy(),
+					referenceWindow.getPreceding(),
 					following));
 
-				newArgs.addAll(referenceWindow.partitionBy());
+				newArgs.addAll(referenceWindow.getPartitionBy());
 
 				return unresolvedCall(unresolvedCall.getFunctionDefinition(), newArgs.toArray(new Expression[0]));
 			} else {
@@ -90,9 +90,9 @@ final class OverWindowResolverRule implements ResolverRule {
 			}
 		}
 
-		private Expression calculateOverWindowFollowing(LogicalOverWindow referenceWindow) {
-			return referenceWindow.following().orElseGet(() -> {
-					WindowKind kind = referenceWindow.preceding().accept(OVER_WINDOW_KIND_EXTRACTOR);
+		private Expression calculateOverWindowFollowing(LocalOverWindow referenceWindow) {
+			return referenceWindow.getFollowing().orElseGet(() -> {
+					WindowKind kind = referenceWindow.getPreceding().accept(OVER_WINDOW_KIND_EXTRACTOR);
 					if (kind == WindowKind.ROW) {
 						return unresolvedCall(BuiltInFunctionDefinitions.CURRENT_ROW);
 					} else {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
index af51499..802b550 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionResolver;
+import org.apache.flink.table.expressions.LocalOverWindow;
 import org.apache.flink.table.expressions.LocalReferenceExpression;
 import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
 import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
 import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
 
 import java.util.List;
 import java.util.Optional;
@@ -73,8 +73,8 @@ public interface ResolverRule {
 		Optional<LocalReferenceExpression> getLocalReference(String alias);
 
 		/**
-		 * Lookup for over windows.
+		 * Access to available local over windows.
 		 */
-		Optional<LogicalOverWindow> getOverWindow(Expression alias);
+		Optional<LocalOverWindow> getOverWindow(Expression alias);
 	}
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
index 50651e9..02b735e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
@@ -18,20 +18,7 @@
 
 package org.apache.flink.table.plan.logical
 
-import java.util.{Optional, List => JList}
-
-import org.apache.flink.table.expressions.{Expression, PlannerExpression}
-
-// ------------------------------------------------------------------------------------------------
-// Over windows
-// ------------------------------------------------------------------------------------------------
-
-case class LogicalOverWindow(
-    alias: Expression,
-    partitionBy: JList[Expression],
-    orderBy: Expression,
-    preceding: Expression,
-    following: Optional[Expression])
+import org.apache.flink.table.expressions.PlannerExpression
 
 // ------------------------------------------------------------------------------------------------
 // Group windows