You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/02 11:11:03 UTC
[flink] 03/05: [FLINK-13028][table-api-java] Refactor local over
windows
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8c1968ac2880c3784c6c35ffb819932483208807
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 1 09:12:41 2019 +0200
[FLINK-13028][table-api-java] Refactor local over windows
---
.../flink/table/expressions/LocalOverWindow.java | 76 ++++++++++++++++++++++
.../table/expressions/ExpressionResolver.java | 21 +++---
.../expressions/rules/OverWindowResolverRule.java | 16 ++---
.../table/expressions/rules/ResolverRule.java | 6 +-
.../flink/table/plan/logical/groupWindows.scala | 15 +----
5 files changed, 98 insertions(+), 36 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
new file mode 100644
index 0000000..45e1a7a
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Local over window created during expression resolution.
+ */
+@Internal
+public final class LocalOverWindow {
+
+ private Expression alias;
+
+ private List<Expression> partitionBy;
+
+ private Expression orderBy;
+
+ private Expression preceding;
+
+ private @Nullable Expression following;
+
+ LocalOverWindow(
+ Expression alias,
+ List<Expression> partitionBy,
+ Expression orderBy,
+ Expression preceding,
+ @Nullable Expression following) {
+ this.alias = alias;
+ this.partitionBy = partitionBy;
+ this.orderBy = orderBy;
+ this.preceding = preceding;
+ this.following = following;
+ }
+
+ public Expression getAlias() {
+ return alias;
+ }
+
+ public List<Expression> getPartitionBy() {
+ return partitionBy;
+ }
+
+ public Expression getOrderBy() {
+ return orderBy;
+ }
+
+ public Expression getPreceding() {
+ return preceding;
+ }
+
+ public Optional<Expression> getFollowing() {
+ return Optional.ofNullable(following);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
index 015751c..ea45f33 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.expressions.rules.ResolverRules;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
@@ -100,13 +99,13 @@ public class ExpressionResolver {
private final Map<String, LocalReferenceExpression> localReferences;
- private final Map<Expression, LogicalOverWindow> overWindows;
+ private final Map<Expression, LocalOverWindow> localOverWindows;
private ExpressionResolver(
TableReferenceLookup tableLookup,
FunctionLookup functionLookup,
FieldReferenceLookup fieldLookup,
- List<OverWindow> overWindows,
+ List<OverWindow> localOverWindows,
List<LocalReferenceExpression> localReferences) {
this.tableLookup = Preconditions.checkNotNull(tableLookup);
this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
@@ -116,7 +115,7 @@ public class ExpressionResolver {
LocalReferenceExpression::getName,
Function.identity()
));
- this.overWindows = prepareOverWindows(overWindows);
+ this.localOverWindows = prepareOverWindows(localOverWindows);
}
/**
@@ -187,11 +186,11 @@ public class ExpressionResolver {
);
}
- private Map<Expression, LogicalOverWindow> prepareOverWindows(List<OverWindow> overWindows) {
+ private Map<Expression, LocalOverWindow> prepareOverWindows(List<OverWindow> overWindows) {
return overWindows.stream()
.map(this::resolveOverWindow)
.collect(Collectors.toMap(
- LogicalOverWindow::alias,
+ LocalOverWindow::getAlias,
Function.identity()
));
}
@@ -262,18 +261,18 @@ public class ExpressionResolver {
}
@Override
- public Optional<LogicalOverWindow> getOverWindow(Expression alias) {
- return Optional.ofNullable(overWindows.get(alias));
+ public Optional<LocalOverWindow> getOverWindow(Expression alias) {
+ return Optional.ofNullable(localOverWindows.get(alias));
}
}
- private LogicalOverWindow resolveOverWindow(OverWindow overWindow) {
- return new LogicalOverWindow(
+ private LocalOverWindow resolveOverWindow(OverWindow overWindow) {
+ return new LocalOverWindow(
overWindow.getAlias(),
prepareExpressions(overWindow.getPartitioning()),
resolveFieldsInSingleExpression(overWindow.getOrder()),
resolveFieldsInSingleExpression(overWindow.getPreceding()),
- overWindow.getFollowing().map(this::resolveFieldsInSingleExpression)
+ overWindow.getFollowing().map(this::resolveFieldsInSingleExpression).orElse(null)
);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
index fcec1ac..3420437 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.LocalOverWindow;
import org.apache.flink.table.expressions.UnresolvedCallExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
import org.apache.flink.table.types.logical.LogicalType;
import java.util.ArrayList;
@@ -68,17 +68,17 @@ final class OverWindowResolverRule implements ResolverRule {
List<Expression> children = unresolvedCall.getChildren();
Expression alias = children.get(1);
- LogicalOverWindow referenceWindow = resolutionContext.getOverWindow(alias)
+ LocalOverWindow referenceWindow = resolutionContext.getOverWindow(alias)
.orElseThrow(() -> new ValidationException("Could not resolve over call."));
Expression following = calculateOverWindowFollowing(referenceWindow);
List<Expression> newArgs = new ArrayList<>(asList(
children.get(0),
- referenceWindow.orderBy(),
- referenceWindow.preceding(),
+ referenceWindow.getOrderBy(),
+ referenceWindow.getPreceding(),
following));
- newArgs.addAll(referenceWindow.partitionBy());
+ newArgs.addAll(referenceWindow.getPartitionBy());
return unresolvedCall(unresolvedCall.getFunctionDefinition(), newArgs.toArray(new Expression[0]));
} else {
@@ -90,9 +90,9 @@ final class OverWindowResolverRule implements ResolverRule {
}
}
- private Expression calculateOverWindowFollowing(LogicalOverWindow referenceWindow) {
- return referenceWindow.following().orElseGet(() -> {
- WindowKind kind = referenceWindow.preceding().accept(OVER_WINDOW_KIND_EXTRACTOR);
+ private Expression calculateOverWindowFollowing(LocalOverWindow referenceWindow) {
+ return referenceWindow.getFollowing().orElseGet(() -> {
+ WindowKind kind = referenceWindow.getPreceding().accept(OVER_WINDOW_KIND_EXTRACTOR);
if (kind == WindowKind.ROW) {
return unresolvedCall(BuiltInFunctionDefinitions.CURRENT_ROW);
} else {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
index af51499..802b550 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.FunctionLookup;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionResolver;
+import org.apache.flink.table.expressions.LocalOverWindow;
import org.apache.flink.table.expressions.LocalReferenceExpression;
import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
import java.util.List;
import java.util.Optional;
@@ -73,8 +73,8 @@ public interface ResolverRule {
Optional<LocalReferenceExpression> getLocalReference(String alias);
/**
- * Lookup for over windows.
+ * Access to available local over windows.
*/
- Optional<LogicalOverWindow> getOverWindow(Expression alias);
+ Optional<LocalOverWindow> getOverWindow(Expression alias);
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
index 50651e9..02b735e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
@@ -18,20 +18,7 @@
package org.apache.flink.table.plan.logical
-import java.util.{Optional, List => JList}
-
-import org.apache.flink.table.expressions.{Expression, PlannerExpression}
-
-// ------------------------------------------------------------------------------------------------
-// Over windows
-// ------------------------------------------------------------------------------------------------
-
-case class LogicalOverWindow(
- alias: Expression,
- partitionBy: JList[Expression],
- orderBy: Expression,
- preceding: Expression,
- following: Optional[Expression])
+import org.apache.flink.table.expressions.PlannerExpression
// ------------------------------------------------------------------------------------------------
// Group windows