You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/02 11:11:05 UTC
[flink] 05/05: [FLINK-13028][table-api-java] Merge flatten and call
resolution rule
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 036c5492a9bfe7636d5e7d5eb664af5e77952707
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 1 15:25:04 2019 +0200
[FLINK-13028][table-api-java] Merge flatten and call resolution rule
---
.../expressions/resolver/ExpressionResolver.java | 3 +-
.../resolver/rules/ResolveCallByArgumentsRule.java | 94 ++++++++++++-------
.../resolver/rules/ResolveFlattenCallRule.java | 101 ---------------------
.../expressions/resolver/rules/ResolverRules.java | 5 -
.../flink/table/api/batch/table/CalcTest.scala | 1 +
.../table/validation/CalcValidationTest.scala | 12 ---
6 files changed, 62 insertions(+), 154 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 4c1e62d..e4c07fa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -90,8 +90,7 @@ public class ExpressionResolver {
ResolverRules.OVER_WINDOWS,
ResolverRules.FIELD_RESOLVE,
ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS,
- ResolverRules.RESOLVE_CALL_BY_ARGUMENTS,
- ResolverRules.FLATTEN_CALL);
+ ResolverRules.RESOLVE_CALL_BY_ARGUMENTS);
}
private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR = new VerifyResolutionVisitor();
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index 73429f4..1f184c8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.expressions.resolver.rules;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -38,21 +40,26 @@ import org.apache.flink.table.types.inference.TypeInferenceUtil;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.util.Preconditions;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
/**
* This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers
* the output data type. All function calls are resolved {@link CallExpression} after applying this
- * rule except for the special case of {@link BuiltInFunctionDefinitions#FLATTEN}.
+ * rule.
+ *
+ * <p>This rule also resolves {@code flatten()} calls on composite types.
*
* <p>If the call expects different types of arguments, but the given arguments have types that can
* be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted.
- *
- * @see ResolveFlattenCallRule
*/
@Internal
final class ResolveCallByArgumentsRule implements ResolverRule {
@@ -60,39 +67,27 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
@Override
public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
return expression.stream()
- .map(expr -> expr.accept(new CallArgumentsCastingVisitor(context)))
+ .flatMap(expr -> expr.accept(new ResolvingCallVisitor(context)).stream())
.collect(Collectors.toList());
}
- private class CallArgumentsCastingVisitor extends RuleExpressionVisitor<Expression> {
+ // --------------------------------------------------------------------------------------------
+
+ private class ResolvingCallVisitor extends RuleExpressionVisitor<List<ResolvedExpression>> {
- CallArgumentsCastingVisitor(ResolutionContext context) {
+ ResolvingCallVisitor(ResolutionContext context) {
super(context);
}
@Override
- public Expression visit(UnresolvedCallExpression unresolvedCall) {
+ public List<ResolvedExpression> visit(UnresolvedCallExpression unresolvedCall) {
final List<ResolvedExpression> resolvedArgs = unresolvedCall.getChildren().stream()
- .map(c -> c.accept(this))
- .map(e -> {
- // special case: FLATTEN
- // a call chain `myFunc().flatten().flatten()` is not allowed
- if (e instanceof UnresolvedCallExpression &&
- ((UnresolvedCallExpression) e).getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) {
- throw new ValidationException("Consecutive flattening calls are not allowed.");
- }
- if (e instanceof ResolvedExpression) {
- return (ResolvedExpression) e;
- }
- throw new TableException("Unexpected unresolved expression: " + e);
- })
+ .flatMap(c -> c.accept(this).stream())
.collect(Collectors.toList());
- // FLATTEN is a special case and the only call that remains unresolved after this rule
- // it will be resolved by ResolveFlattenCallRule
if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) {
- return unresolvedCall.replaceArgs(new ArrayList<>(resolvedArgs));
+ return executeFlatten(resolvedArgs);
}
if (unresolvedCall.getFunctionDefinition() instanceof BuiltInFunctionDefinition) {
@@ -100,13 +95,49 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
(BuiltInFunctionDefinition) unresolvedCall.getFunctionDefinition();
if (definition.getTypeInference().getOutputTypeStrategy() != TypeStrategies.MISSING) {
- return runTypeInference(
- unresolvedCall,
- definition.getTypeInference(),
- resolvedArgs);
+ return Collections.singletonList(
+ runTypeInference(
+ unresolvedCall,
+ definition.getTypeInference(),
+ resolvedArgs));
}
}
- return runLegacyTypeInference(unresolvedCall, resolvedArgs);
+ return Collections.singletonList(
+ runLegacyTypeInference(unresolvedCall, resolvedArgs));
+ }
+
+ @Override
+ protected List<ResolvedExpression> defaultMethod(Expression expression) {
+ if (expression instanceof ResolvedExpression) {
+ return Collections.singletonList((ResolvedExpression) expression);
+ }
+ throw new TableException("Unexpected unresolved expression: " + expression);
+ }
+
+ private List<ResolvedExpression> executeFlatten(List<ResolvedExpression> args) {
+ if (args.size() != 1) {
+ throw new ValidationException("Invalid number of arguments for flattening.");
+ }
+ final ResolvedExpression composite = args.get(0);
+ // TODO support the new type system with ROW and STRUCTURED_TYPE
+ final TypeInformation<?> resultType = fromDataTypeToLegacyInfo(composite.getOutputDataType());
+ if (resultType instanceof CompositeType) {
+ return flattenCompositeType(composite, (CompositeType<?>) resultType);
+ } else {
+ return singletonList(composite);
+ }
+ }
+
+ private List<ResolvedExpression> flattenCompositeType(ResolvedExpression composite, CompositeType<?> resultType) {
+ return IntStream.range(0, resultType.getArity())
+ .mapToObj(idx ->
+ resolutionContext.postResolutionFactory()
+ .get(
+ composite,
+ valueLiteral(resultType.getFieldNames()[idx]),
+ fromLegacyInfoToDataType(resultType.getTypeAt(idx)))
+ )
+ .collect(Collectors.toList());
}
private ResolvedExpression runTypeInference(
@@ -163,11 +194,6 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
})
.collect(Collectors.toList());
}
-
- @Override
- protected Expression defaultMethod(Expression expression) {
- return expression;
- }
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
deleted file mode 100644
index 53d7750..0000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions.resolver.rules;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.UnresolvedCallExpression;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.util.Collections.singletonList;
-import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
-import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
-import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
-
-/**
- * Replaces {@link BuiltInFunctionDefinitions#FLATTEN} with resolved calls to {@link BuiltInFunctionDefinitions#GET}
- * for all fields of underlying field of complex type.
- *
- * @see ResolveCallByArgumentsRule
- */
-@Internal
-final class ResolveFlattenCallRule implements ResolverRule {
-
- @Override
- public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
- return expression.stream()
- .flatMap(expr -> expr.accept(new FlatteningCallVisitor(context)).stream())
- .collect(Collectors.toList());
- }
-
- private class FlatteningCallVisitor extends RuleExpressionVisitor<List<Expression>> {
-
- FlatteningCallVisitor(ResolutionContext context) {
- super(context);
- }
-
- @Override
- public List<Expression> visit(UnresolvedCallExpression unresolvedCall) {
- if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) {
- return executeFlatten(unresolvedCall);
- }
-
- return singletonList(unresolvedCall);
- }
-
- private List<Expression> executeFlatten(UnresolvedCallExpression unresolvedCall) {
- final Expression composite = unresolvedCall.getChildren().get(0);
- if (!(composite instanceof ResolvedExpression)) {
- throw new TableException("Resolved expression expected for flattening.");
- }
- final ResolvedExpression resolvedComposite = (ResolvedExpression) composite;
- final TypeInformation<?> resultType = fromDataTypeToLegacyInfo(resolvedComposite.getOutputDataType());
- if (resultType instanceof CompositeType) {
- return flattenCompositeType(resolvedComposite, (CompositeType<?>) resultType);
- } else {
- return singletonList(composite);
- }
- }
-
- private List<Expression> flattenCompositeType(ResolvedExpression resolvedComposite, CompositeType<?> resultType) {
- return IntStream.range(0, resultType.getArity())
- .mapToObj(idx ->
- resolutionContext.postResolutionFactory()
- .get(
- resolvedComposite,
- valueLiteral(resultType.getFieldNames()[idx]),
- fromLegacyInfoToDataType(resultType.getTypeAt(idx)))
- )
- .collect(Collectors.toList());
- }
-
- @Override
- protected List<Expression> defaultMethod(Expression expression) {
- return singletonList(expression);
- }
- }
-}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
index 915f6a5..671b1e1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
@@ -28,11 +28,6 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
public final class ResolverRules {
/**
- * Rule that resolves flatten call. See {@link ResolveFlattenCallRule} for details.
- */
- public static final ResolverRule FLATTEN_CALL = new ResolveFlattenCallRule();
-
- /**
* Resolves {@link UnresolvedReferenceExpression}. See {@link ReferenceResolverRule} for details.
*/
public static final ResolverRule FIELD_RESOLVE = new ReferenceResolverRule();
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
index 7f8bc81..b688d4a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.batch.table
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.batch.table.CalcTest.{MyHashCode, TestCaseClass, WC, giveMeCaseClass}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.ScalarFunction
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
index c4216e1..b9e9f21 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
@@ -161,16 +161,4 @@ class CalcValidationTest extends TableTestBase {
util.addTable[(Int, Long, String)]("MyTable", 'int, 'long, 'string)
.select('int, 'long.log as 'long, 'string)
}
-
- @Test
- def testConsecutiveFlattening(): Unit = {
- expectedException.expect(classOf[ValidationException])
- expectedException.expectMessage("Consecutive flattening calls are not allowed.")
-
- val util = streamTestUtil()
- util.addTable[(Long, Int)](
- "MyTable",
- 'tuple)
- .select('tuple.flatten().flatten())
- }
}