You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/02 11:11:05 UTC

[flink] 05/05: [FLINK-13028][table-api-java] Merge flatten and call resolution rule

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 036c5492a9bfe7636d5e7d5eb664af5e77952707
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 1 15:25:04 2019 +0200

    [FLINK-13028][table-api-java] Merge flatten and call resolution rule
---
 .../expressions/resolver/ExpressionResolver.java   |   3 +-
 .../resolver/rules/ResolveCallByArgumentsRule.java |  94 ++++++++++++-------
 .../resolver/rules/ResolveFlattenCallRule.java     | 101 ---------------------
 .../expressions/resolver/rules/ResolverRules.java  |   5 -
 .../flink/table/api/batch/table/CalcTest.scala     |   1 +
 .../table/validation/CalcValidationTest.scala      |  12 ---
 6 files changed, 62 insertions(+), 154 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 4c1e62d..e4c07fa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -90,8 +90,7 @@ public class ExpressionResolver {
 			ResolverRules.OVER_WINDOWS,
 			ResolverRules.FIELD_RESOLVE,
 			ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS,
-			ResolverRules.RESOLVE_CALL_BY_ARGUMENTS,
-			ResolverRules.FLATTEN_CALL);
+			ResolverRules.RESOLVE_CALL_BY_ARGUMENTS);
 	}
 
 	private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR = new VerifyResolutionVisitor();
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index 73429f4..1f184c8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -38,21 +40,26 @@ import org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.singletonList;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
 /**
  * This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers
  * the output data type. All function calls are resolved {@link CallExpression} after applying this
- * rule except for the special case of {@link BuiltInFunctionDefinitions#FLATTEN}.
+ * rule.
+ *
+ * <p>This rule also resolves {@code flatten()} calls on composite types.
  *
  * <p>If the call expects different types of arguments, but the given arguments have types that can
  * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted.
- *
- * @see ResolveFlattenCallRule
  */
 @Internal
 final class ResolveCallByArgumentsRule implements ResolverRule {
@@ -60,39 +67,27 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 	@Override
 	public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
 		return expression.stream()
-			.map(expr -> expr.accept(new CallArgumentsCastingVisitor(context)))
+			.flatMap(expr -> expr.accept(new ResolvingCallVisitor(context)).stream())
 			.collect(Collectors.toList());
 	}
 
-	private class CallArgumentsCastingVisitor extends RuleExpressionVisitor<Expression> {
+	// --------------------------------------------------------------------------------------------
+
+	private class ResolvingCallVisitor extends RuleExpressionVisitor<List<ResolvedExpression>> {
 
-		CallArgumentsCastingVisitor(ResolutionContext context) {
+		ResolvingCallVisitor(ResolutionContext context) {
 			super(context);
 		}
 
 		@Override
-		public Expression visit(UnresolvedCallExpression unresolvedCall) {
+		public List<ResolvedExpression> visit(UnresolvedCallExpression unresolvedCall) {
 
 			final List<ResolvedExpression> resolvedArgs = unresolvedCall.getChildren().stream()
-				.map(c -> c.accept(this))
-				.map(e -> {
-					// special case: FLATTEN
-					// a call chain `myFunc().flatten().flatten()` is not allowed
-					if (e instanceof UnresolvedCallExpression &&
-							((UnresolvedCallExpression) e).getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) {
-						throw new ValidationException("Consecutive flattening calls are not allowed.");
-					}
-					if (e instanceof ResolvedExpression) {
-						return (ResolvedExpression) e;
-					}
-					throw new TableException("Unexpected unresolved expression: " + e);
-				})
+				.flatMap(c -> c.accept(this).stream())
 				.collect(Collectors.toList());
 
-			// FLATTEN is a special case and the only call that remains unresolved after this rule
-			// it will be resolved by ResolveFlattenCallRule
 			if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) {
-				return unresolvedCall.replaceArgs(new ArrayList<>(resolvedArgs));
+				return executeFlatten(resolvedArgs);
 			}
 
 			if (unresolvedCall.getFunctionDefinition() instanceof BuiltInFunctionDefinition) {
@@ -100,13 +95,49 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 					(BuiltInFunctionDefinition) unresolvedCall.getFunctionDefinition();
 
 				if (definition.getTypeInference().getOutputTypeStrategy() != TypeStrategies.MISSING) {
-					return runTypeInference(
-						unresolvedCall,
-						definition.getTypeInference(),
-						resolvedArgs);
+					return Collections.singletonList(
+						runTypeInference(
+							unresolvedCall,
+							definition.getTypeInference(),
+							resolvedArgs));
 				}
 			}
-			return runLegacyTypeInference(unresolvedCall, resolvedArgs);
+			return Collections.singletonList(
+				runLegacyTypeInference(unresolvedCall, resolvedArgs));
+		}
+
+		@Override
+		protected List<ResolvedExpression> defaultMethod(Expression expression) {
+			if (expression instanceof ResolvedExpression) {
+				return Collections.singletonList((ResolvedExpression) expression);
+			}
+			throw new TableException("Unexpected unresolved expression: " + expression);
+		}
+
+		private List<ResolvedExpression> executeFlatten(List<ResolvedExpression> args) {
+			if (args.size() != 1) {
+				throw new ValidationException("Invalid number of arguments for flattening.");
+			}
+			final ResolvedExpression composite = args.get(0);
+			// TODO support the new type system with ROW and STRUCTURED_TYPE
+			final TypeInformation<?> resultType = fromDataTypeToLegacyInfo(composite.getOutputDataType());
+			if (resultType instanceof CompositeType) {
+				return flattenCompositeType(composite, (CompositeType<?>) resultType);
+			} else {
+				return singletonList(composite);
+			}
+		}
+
+		private List<ResolvedExpression> flattenCompositeType(ResolvedExpression composite, CompositeType<?> resultType) {
+			return IntStream.range(0, resultType.getArity())
+				.mapToObj(idx ->
+					resolutionContext.postResolutionFactory()
+						.get(
+							composite,
+							valueLiteral(resultType.getFieldNames()[idx]),
+							fromLegacyInfoToDataType(resultType.getTypeAt(idx)))
+				)
+				.collect(Collectors.toList());
 		}
 
 		private ResolvedExpression runTypeInference(
@@ -163,11 +194,6 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 				})
 				.collect(Collectors.toList());
 		}
-
-		@Override
-		protected Expression defaultMethod(Expression expression) {
-			return expression;
-		}
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
deleted file mode 100644
index 53d7750..0000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions.resolver.rules;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.UnresolvedCallExpression;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.util.Collections.singletonList;
-import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
-import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
-import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
-
-/**
- * Replaces {@link BuiltInFunctionDefinitions#FLATTEN} with resolved calls to {@link BuiltInFunctionDefinitions#GET}
- * for all fields of underlying field of complex type.
- *
- * @see ResolveCallByArgumentsRule
- */
-@Internal
-final class ResolveFlattenCallRule implements ResolverRule {
-
-	@Override
-	public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
-		return expression.stream()
-			.flatMap(expr -> expr.accept(new FlatteningCallVisitor(context)).stream())
-			.collect(Collectors.toList());
-	}
-
-	private class FlatteningCallVisitor extends RuleExpressionVisitor<List<Expression>> {
-
-		FlatteningCallVisitor(ResolutionContext context) {
-			super(context);
-		}
-
-		@Override
-		public List<Expression> visit(UnresolvedCallExpression unresolvedCall) {
-			if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) {
-				return executeFlatten(unresolvedCall);
-			}
-
-			return singletonList(unresolvedCall);
-		}
-
-		private List<Expression> executeFlatten(UnresolvedCallExpression unresolvedCall) {
-			final Expression composite = unresolvedCall.getChildren().get(0);
-			if (!(composite instanceof ResolvedExpression)) {
-				throw new TableException("Resolved expression expected for flattening.");
-			}
-			final ResolvedExpression resolvedComposite = (ResolvedExpression) composite;
-			final TypeInformation<?> resultType = fromDataTypeToLegacyInfo(resolvedComposite.getOutputDataType());
-			if (resultType instanceof CompositeType) {
-				return flattenCompositeType(resolvedComposite, (CompositeType<?>) resultType);
-			} else {
-				return singletonList(composite);
-			}
-		}
-
-		private List<Expression> flattenCompositeType(ResolvedExpression resolvedComposite, CompositeType<?> resultType) {
-			return IntStream.range(0, resultType.getArity())
-				.mapToObj(idx ->
-					resolutionContext.postResolutionFactory()
-						.get(
-							resolvedComposite,
-							valueLiteral(resultType.getFieldNames()[idx]),
-							fromLegacyInfoToDataType(resultType.getTypeAt(idx)))
-				)
-				.collect(Collectors.toList());
-		}
-
-		@Override
-		protected List<Expression> defaultMethod(Expression expression) {
-			return singletonList(expression);
-		}
-	}
-}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
index 915f6a5..671b1e1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
@@ -28,11 +28,6 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 public final class ResolverRules {
 
 	/**
-	 * Rule that resolves flatten call. See {@link ResolveFlattenCallRule} for details.
-	 */
-	public static final ResolverRule FLATTEN_CALL = new ResolveFlattenCallRule();
-
-	/**
 	 * Resolves {@link UnresolvedReferenceExpression}. See {@link ReferenceResolverRule} for details.
 	 */
 	public static final ResolverRule FIELD_RESOLVE = new ReferenceResolverRule();
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
index 7f8bc81..b688d4a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.batch.table
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.api.batch.table.CalcTest.{MyHashCode, TestCaseClass, WC, giveMeCaseClass}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.functions.ScalarFunction
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
index c4216e1..b9e9f21 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
@@ -161,16 +161,4 @@ class CalcValidationTest extends TableTestBase {
     util.addTable[(Int, Long, String)]("MyTable", 'int, 'long, 'string)
       .select('int, 'long.log as 'long, 'string)
   }
-
-  @Test
-  def testConsecutiveFlattening(): Unit = {
-    expectedException.expect(classOf[ValidationException])
-    expectedException.expectMessage("Consecutive flattening calls are not allowed.")
-
-    val util = streamTestUtil()
-    util.addTable[(Long, Int)](
-      "MyTable",
-      'tuple)
-    .select('tuple.flatten().flatten())
-  }
 }