You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/11/26 14:49:53 UTC
[flink] 06/06: [FLINK-12996][table-common] Simplify type validators
structure
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0af48012ec621f429b3abcfc4ca6e0d13ad44316
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Nov 25 17:10:28 2019 +0100
[FLINK-12996][table-common] Simplify type validators structure
This closes #10312.
---
...peValidator.java => ArgumentTypeValidator.java} | 24 +-
.../types/inference/ConstantArgumentCount.java | 2 +
.../table/types/inference/InputTypeValidators.java | 97 +++---
...alidator.java => AndTypeArgumentValidator.java} | 56 ++--
.../inference/validators/AnyTypeValidator.java | 13 +-
.../validators/CompositeTypeValidator.java | 331 ---------------------
.../validators/ExplicitTypeValidator.java | 44 +--
.../inference/validators/LiteralTypeValidator.java | 15 +-
...Validator.java => OrTypeArgumentValidator.java} | 60 ++--
.../inference/validators/OrTypeInputValidator.java | 179 +++++++++++
.../inference/validators/PassingTypeValidator.java | 13 +-
.../validators/SequenceInputValidator.java | 112 +++++++
.../validators/VaryingSequenceTypeValidator.java | 91 +++---
.../types/inference/InputTypeValidatorsTest.java | 61 ++--
14 files changed, 525 insertions(+), 573 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/SingleInputTypeValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ArgumentTypeValidator.java
similarity index 67%
rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/SingleInputTypeValidator.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ArgumentTypeValidator.java
index e97f6f7..dfc3614 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/SingleInputTypeValidator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ArgumentTypeValidator.java
@@ -16,18 +16,17 @@
* limitations under the License.
*/
-package org.apache.flink.table.types.inference.validators;
+package org.apache.flink.table.types.inference;
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.InputTypeValidator;
+import org.apache.flink.table.functions.FunctionDefinition;
/**
- * Validator that checks a single input type of a function call.
+ * Validator that checks a single input argument type of a function call.
*/
-@Internal
-public interface SingleInputTypeValidator extends InputTypeValidator {
+@PublicEvolving
+public interface ArgumentTypeValidator {
/**
* Main logic for validating a single input type. Returns {@code true} if the argument is valid for the
@@ -35,10 +34,17 @@ public interface SingleInputTypeValidator extends InputTypeValidator {
*
* @param callContext provides details about the function call
* @param argumentPos argument index in the {@link CallContext}
- * @param validatorPos logical index that defines the expected validation logic
* @param throwOnFailure whether this function is allowed to throw an {@link ValidationException}
* with a meaningful exception in case the validation is not successful or
* if this function should simply return {@code false}.
*/
- boolean validateArgument(CallContext callContext, int argumentPos, int validatorPos, boolean throwOnFailure);
+ boolean validateArgument(CallContext callContext, int argumentPos, boolean throwOnFailure);
+
+ /**
+ * Returns a summary of the function's expected argument at {@code argumentPos}.
+ *
+ * @param functionDefinition the function definition that defines the function currently being called.
+ * @param argumentPos the position within the function call for which the signature should be retrieved
+ */
+ Signature.Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos);
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ConstantArgumentCount.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ConstantArgumentCount.java
index 6cfa439..4ea2df7 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ConstantArgumentCount.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ConstantArgumentCount.java
@@ -26,6 +26,8 @@ import java.util.Optional;
/**
* Helper class for {@link ArgumentCount} with constant boundaries.
+ *
+ * <p>Note: All boundaries of this class are inclusive.
*/
@Internal
public final class ConstantArgumentCount implements ArgumentCount {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeValidators.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeValidators.java
index 9cfc734..0790147 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeValidators.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeValidators.java
@@ -20,13 +20,14 @@ package org.apache.flink.table.types.inference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.validators.AndTypeArgumentValidator;
import org.apache.flink.table.types.inference.validators.AnyTypeValidator;
-import org.apache.flink.table.types.inference.validators.CompositeTypeValidator;
-import org.apache.flink.table.types.inference.validators.CompositeTypeValidator.Composition;
import org.apache.flink.table.types.inference.validators.ExplicitTypeValidator;
import org.apache.flink.table.types.inference.validators.LiteralTypeValidator;
+import org.apache.flink.table.types.inference.validators.OrTypeArgumentValidator;
+import org.apache.flink.table.types.inference.validators.OrTypeInputValidator;
import org.apache.flink.table.types.inference.validators.PassingTypeValidator;
-import org.apache.flink.table.types.inference.validators.SingleInputTypeValidator;
+import org.apache.flink.table.types.inference.validators.SequenceInputValidator;
import org.apache.flink.table.types.inference.validators.VaryingSequenceTypeValidator;
import org.apache.flink.table.types.logical.LogicalType;
@@ -38,7 +39,7 @@ import java.util.stream.Collectors;
* Validators for checking the input data types of a function call.
*
* @see InputTypeValidator
- * @see SingleInputTypeValidator
+ * @see ArgumentTypeValidator
*/
@Internal
public final class InputTypeValidators {
@@ -46,96 +47,112 @@ public final class InputTypeValidators {
/**
* Validator that does not perform any validation and always passes.
*/
- public static final InputTypeValidator PASSING = new PassingTypeValidator();
+ public static final PassingTypeValidator PASSING = new PassingTypeValidator();
/**
* Validator that checks for a single argument that can be of any type.
*/
- public static final SingleInputTypeValidator ANY = new AnyTypeValidator();
+ public static final AnyTypeValidator ANY = new AnyTypeValidator();
/**
* Validator that checks if a single argument is a literal.
*/
- public static final SingleInputTypeValidator LITERAL = new LiteralTypeValidator(false);
+ public static final LiteralTypeValidator LITERAL = new LiteralTypeValidator(false);
/**
* Validator that checks if a single argument is a literal or null.
*/
- public static final SingleInputTypeValidator LITERAL_OR_NULL = new LiteralTypeValidator(true);
+ public static final LiteralTypeValidator LITERAL_OR_NULL = new LiteralTypeValidator(true);
/**
- * Validator that checks if each operand corresponds to an explicitly defined logical type.
+ * Validator that checks if a single argument corresponds to an explicitly defined logical type.
*
* <p>Note: The validation happens on {@link LogicalType} level only.
*/
- public static SingleInputTypeValidator explicit(DataType... expectedDataTypes) {
- final List<LogicalType> expectedTypes = Arrays.stream(expectedDataTypes)
- .map(DataType::getLogicalType)
- .collect(Collectors.toList());
- return new ExplicitTypeValidator(expectedTypes);
+ public static ArgumentTypeValidator explicit(DataType expectedDataType) {
+ return new ExplicitTypeValidator(expectedDataType.getLogicalType());
}
/**
- * Conjunction of multiple {@link SingleInputTypeValidator}s into one like {@code f(NUMERIC && LITERAL)}.
+ * Conjunction of multiple {@link ArgumentTypeValidator}s into one like {@code f(NUMERIC && LITERAL)}.
*/
- public static SingleInputTypeValidator and(SingleInputTypeValidator... validators) {
- return new CompositeTypeValidator(Composition.AND, Arrays.asList(validators), null);
+ public static ArgumentTypeValidator and(ArgumentTypeValidator... validators) {
+ return new AndTypeArgumentValidator(Arrays.asList(validators));
}
/**
- * Conjunction of multiple {@link InputTypeValidator}s into one like {@code f(NUMERIC) && f(LITERAL)}.
+ * Conjunction of multiple {@link ArgumentTypeValidator}s into one like {@code f(NUMERIC || STRING)}.
*/
- public static InputTypeValidator and(InputTypeValidator... validators) {
- return new CompositeTypeValidator(Composition.AND, Arrays.asList(validators), null);
+ public static ArgumentTypeValidator or(ArgumentTypeValidator... validators) {
+ return new OrTypeArgumentValidator(Arrays.asList(validators));
}
/**
- * Conjunction of multiple {@link SingleInputTypeValidator}s into one like {@code f(NUMERIC || STRING)}.
+ * Conjunction of multiple {@link InputTypeValidator}s into one like {@code f(NUMERIC) || f(STRING)}.
*/
- public static SingleInputTypeValidator or(SingleInputTypeValidator... validators) {
- return new CompositeTypeValidator(Composition.OR, Arrays.asList(validators), null);
+ public static InputTypeValidator or(InputTypeValidator... validators) {
+ return new OrTypeInputValidator(Arrays.asList(validators));
}
/**
- * Conjunction of multiple {@link InputTypeValidator}s into one like {@code f(NUMERIC) || f(STRING)}.
+ * Validator that checks if each operand corresponds to an explicitly defined logical type
+ * like {@code f(STRING, INT)}.
+ *
+ * <p>Note: The validation happens on {@link LogicalType} level only.
*/
- public static InputTypeValidator or(InputTypeValidator... validators) {
- return new CompositeTypeValidator(Composition.OR, Arrays.asList(validators), null);
+ public static InputTypeValidator explicitSequence(DataType... expectedDataTypes) {
+ final List<ArgumentTypeValidator> validators = Arrays.stream(expectedDataTypes)
+ .map(InputTypeValidators::explicit)
+ .collect(Collectors.toList());
+ return new SequenceInputValidator(validators, null);
+ }
+
+ /**
+ * Validator that checks if each named operand corresponds to an explicitly defined logical type
+ * like {@code f(s STRING, i INT)}.
+ *
+ * <p>Note: The validation happens on {@link LogicalType} level only.
+ */
+ public static InputTypeValidator explicitSequence(String[] argumentNames, DataType[] expectedDataTypes) {
+ final List<ArgumentTypeValidator> validators = Arrays.stream(expectedDataTypes)
+ .map(InputTypeValidators::explicit)
+ .collect(Collectors.toList());
+ return new SequenceInputValidator(validators, Arrays.asList(argumentNames));
}
/**
- * A sequence of {@link SingleInputTypeValidator}s for validating an entire function signature
+ * A sequence of {@link ArgumentTypeValidator}s for validating an entire function signature
* like {@code f(STRING, NUMERIC)}.
*/
- public static InputTypeValidator sequence(SingleInputTypeValidator... validators) {
- return new CompositeTypeValidator(Composition.SEQUENCE, Arrays.asList(validators), null);
+ public static InputTypeValidator sequence(ArgumentTypeValidator... validators) {
+ return new SequenceInputValidator(Arrays.asList(validators), null);
}
/**
- * A sequence of {@link SingleInputTypeValidator}s for validating an entire named function signature
+ * A sequence of {@link ArgumentTypeValidator}s for validating an entire named function signature
* like {@code f(s STRING, n NUMERIC)}.
*/
- public static InputTypeValidator sequence(String[] argumentNames, SingleInputTypeValidator[] validators) {
- return new CompositeTypeValidator(Composition.SEQUENCE, Arrays.asList(validators), Arrays.asList(argumentNames));
+ public static InputTypeValidator sequence(String[] argumentNames, ArgumentTypeValidator[] validators) {
+ return new SequenceInputValidator(Arrays.asList(validators), Arrays.asList(argumentNames));
}
/**
- * A varying sequence of {@link SingleInputTypeValidator}s for validating an entire function signature
+ * A varying sequence of {@link ArgumentTypeValidator}s for validating an entire function signature
* like {@code f(INT, STRING, NUMERIC...)}. The first n - 1 arguments must be constant and are validated
- * according to {@link #sequence(String[], SingleInputTypeValidator[])}. The n-th argument can
- * occur 0, 1, or more times.
+ * according to {@link #sequence(ArgumentTypeValidator[])}. The n-th argument can occur 0, 1, or
+ * more times.
*/
- public static InputTypeValidator varyingSequence(SingleInputTypeValidator... validators) {
+ public static InputTypeValidator varyingSequence(ArgumentTypeValidator... validators) {
return new VaryingSequenceTypeValidator(Arrays.asList(validators), null);
}
/**
- * A varying sequence of {@link SingleInputTypeValidator}s for validating an entire function signature
+ * A varying sequence of {@link ArgumentTypeValidator}s for validating an entire function signature
* like {@code f(i INT, str STRING, n NUMERIC...)}. The first n - 1 arguments must be constant and are validated
- * according to {@link #sequence(String[], SingleInputTypeValidator[])}. The n-th argument can
- * occur 0, 1, or more times.
+ * according to {@link #sequence(String[], ArgumentTypeValidator[])}. The n-th argument can occur 0, 1,
+ * or more times.
*/
- public static InputTypeValidator varyingSequence(String[] argumentNames, SingleInputTypeValidator[] validators) {
+ public static InputTypeValidator varyingSequence(String[] argumentNames, ArgumentTypeValidator[] validators) {
return new VaryingSequenceTypeValidator(Arrays.asList(validators), Arrays.asList(argumentNames));
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/LiteralTypeValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/AndTypeArgumentValidator.java
similarity index 52%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/LiteralTypeValidator.java
copy to flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/AndTypeArgumentValidator.java
index be03eab..43b9335 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/LiteralTypeValidator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/AndTypeArgumentValidator.java
@@ -20,57 +20,45 @@ package org.apache.flink.table.types.inference.validators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeValidator;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.util.Preconditions;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
- * Validator that checks if a single argument is a literal.
+ * Validator that checks for a conjunction of multiple {@link ArgumentTypeValidator}s into one like
+ * {@code f(NUMERIC && LITERAL)}.
*/
@Internal
-public class LiteralTypeValidator implements SingleInputTypeValidator {
+public final class AndTypeArgumentValidator implements ArgumentTypeValidator {
- private final boolean allowNull;
+ private final List<? extends ArgumentTypeValidator> validators;
- public LiteralTypeValidator(boolean allowNull) {
- this.allowNull = allowNull;
+ public AndTypeArgumentValidator(List<? extends ArgumentTypeValidator> validators) {
+ Preconditions.checkArgument(validators.size() > 0);
+ this.validators = validators;
}
@Override
- public boolean validateArgument(CallContext callContext, int argumentPos, int validatorPos, boolean throwOnFailure) {
- if (!callContext.isArgumentLiteral(argumentPos)) {
- if (throwOnFailure) {
- throw callContext.newValidationError("Literal expected.");
+ public boolean validateArgument(CallContext callContext, int argumentPos, boolean throwOnFailure) {
+ for (ArgumentTypeValidator validator : validators) {
+ if (!validator.validateArgument(callContext, argumentPos, throwOnFailure)) {
+ return false;
}
- return false;
- }
- if (callContext.isArgumentNull(argumentPos) && !allowNull) {
- if (throwOnFailure) {
- throw callContext.newValidationError("Literal must not be NULL.");
- }
- return false;
}
return true;
}
@Override
- public ArgumentCount getArgumentCount() {
- return ConstantArgumentCount.of(1);
- }
-
- @Override
- public boolean validate(CallContext callContext, boolean throwOnFailure) {
- return validateArgument(callContext, 0, 0, throwOnFailure);
- }
-
- @Override
- public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
- return Collections.singletonList(Signature.of(Signature.Argument.of("<LITERAL>")));
+ public Signature.Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) {
+ final String argument = validators.stream()
+ .map(v -> v.getExpectedArgument(functionDefinition, argumentPos).getType())
+ .collect(Collectors.joining(" & ", "[", "]"));
+ return Signature.Argument.of(argument);
}
@Override
@@ -81,12 +69,12 @@ public class LiteralTypeValidator implements SingleInputTypeValidator {
if (o == null || getClass() != o.getClass()) {
return false;
}
- LiteralTypeValidator that = (LiteralTypeValidator) o;
- return allowNull == that.allowNull;
+ AndTypeArgumentValidator that = (AndTypeArgumentValidator) o;
+ return Objects.equals(validators, that.validators);
}
@Override
public int hashCode() {
- return Objects.hash(allowNull);
+ return Objects.hash(validators);
}
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/AnyTypeValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/AnyTypeValidator.java
index 9324ccc..4bb816e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/AnyTypeValidator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/AnyTypeValidator.java
@@ -20,8 +20,10 @@ package org.apache.flink.table.types.inference.validators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeValidator;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeValidator;
import org.apache.flink.table.types.inference.Signature;
import java.util.Collections;
@@ -31,14 +33,19 @@ import java.util.List;
* Validator that checks for a single argument that can be of any type.
*/
@Internal
-public final class AnyTypeValidator implements SingleInputTypeValidator {
+public final class AnyTypeValidator implements ArgumentTypeValidator, InputTypeValidator {
@Override
- public boolean validateArgument(CallContext callContext, int argumentPos, int validatorPos, boolean throwOnFailure) {
+ public boolean validateArgument(CallContext callContext, int argumentPos, boolean throwOnFailure) {
return true;
}
@Override
+ public Signature.Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) {
+ return Signature.Argument.of("<ANY>");
+ }
+
+ @Override
public ArgumentCount getArgumentCount() {
return ConstantArgumentCount.of(1);
}
@@ -50,7 +57,7 @@ public final class AnyTypeValidator implements SingleInputTypeValidator {
@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
- return Collections.singletonList(Signature.of(Signature.Argument.of("<ANY>")));
+ return Collections.singletonList(Signature.of(getExpectedArgument(definition, 0)));
}
@Override
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/CompositeTypeValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/CompositeTypeValidator.java
deleted file mode 100644
index e2892a1..0000000
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/CompositeTypeValidator.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.types.inference.validators;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.ArgumentCount;
-import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
-import org.apache.flink.table.types.inference.InputTypeValidator;
-import org.apache.flink.table.types.inference.Signature;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Validator that checks for set of {@link InputTypeValidator}s. It is able to combine function signature validators
- * or single argument validators.
- *
- * <p>This validator offers different semantics depending on the passed {@link Composition}.
- *
- * <p>AND: Conjunction of multiple {@link InputTypeValidator}s into one like {@code f(NUMERIC) && f(LITERAL)}. Only
- * the first validator is used for signature generation.
- *
- * <p>OR: Disjunction of multiple {@link InputTypeValidator}s into one like {@code f(NUMERIC) || f(STRING)}.
- *
- * <p>SEQUENCE: A sequence of {@link SingleInputTypeValidator}s for validating an entire function signature
- * like {@code f(STRING, NUMERIC)}.
- */
-@Internal
-public final class CompositeTypeValidator implements SingleInputTypeValidator {
-
- private final Composition composition;
-
- private final List<? extends InputTypeValidator> validators;
-
- private final @Nullable List<String> argumentNames;
-
- /**
- * Kind of composition for combining {@link InputTypeValidator}s.
- */
- public enum Composition {
- AND,
- OR,
- SEQUENCE
- }
-
- public CompositeTypeValidator(
- Composition composition,
- List<? extends InputTypeValidator> validators,
- @Nullable List<String> argumentNames) {
- Preconditions.checkArgument(validators.size() > 0);
- Preconditions.checkArgument(argumentNames == null || argumentNames.size() == validators.size());
- this.composition = composition;
- this.validators = validators;
- this.argumentNames = argumentNames;
- }
-
- @Override
- public boolean validateArgument(CallContext callContext, int argumentPos, int validatorPos, boolean throwOnFailure) {
- final List<SingleInputTypeValidator> singleValidators = validators.stream()
- .map(v -> (SingleInputTypeValidator) v)
- .collect(Collectors.toList());
-
- switch (composition) {
- case SEQUENCE:
- return singleValidators.get(validatorPos)
- .validateArgument(callContext, argumentPos, 0, throwOnFailure);
- case AND:
- for (SingleInputTypeValidator validator : singleValidators) {
- if (!validator.validateArgument(callContext, argumentPos, validatorPos, throwOnFailure)) {
- return false;
- }
- }
- return true;
- case OR:
- for (SingleInputTypeValidator validator : singleValidators) {
- if (validator.validateArgument(callContext, argumentPos, validatorPos, false)) {
- return true;
- }
- }
- // generate a helpful exception if possible
- if (throwOnFailure) {
- for (SingleInputTypeValidator validator : singleValidators) {
- validator.validateArgument(callContext, argumentPos, validatorPos, true);
- }
- }
- return false;
- default:
- throw new IllegalStateException("Unsupported composition.");
- }
- }
-
- @Override
- public ArgumentCount getArgumentCount() {
- switch (composition) {
- case SEQUENCE:
- return ConstantArgumentCount.of(validators.size());
- case AND:
- case OR:
- default:
- final List<ArgumentCount> counts = new AbstractList<ArgumentCount>() {
- public ArgumentCount get(int index) {
- return validators.get(index).getArgumentCount();
- }
-
- public int size() {
- return validators.size();
- }
- };
- final Integer min = commonMin(counts);
- final Integer max = commonMax(counts);
- final ArgumentCount compositeCount = new ArgumentCount() {
- @Override
- public boolean isValidCount(int count) {
- switch (composition) {
- case AND:
- for (ArgumentCount c : counts) {
- if (!c.isValidCount(count)) {
- return false;
- }
- }
- return true;
- case OR:
- default:
- for (ArgumentCount c : counts) {
- if (c.isValidCount(count)) {
- return true;
- }
- }
- return false;
- }
- }
-
- @Override
- public Optional<Integer> getMinCount() {
- return Optional.ofNullable(min);
- }
-
- @Override
- public Optional<Integer> getMaxCount() {
- return Optional.ofNullable(max);
- }
- };
-
- // use constant count if applicable
- if (min == null || max == null) {
- // no boundaries
- return compositeCount;
- }
- for (int i = min; i <= max; i++) {
- if (!compositeCount.isValidCount(i)) {
- // not the full range
- return compositeCount;
- }
- }
- if (min.equals(max)) {
- return ConstantArgumentCount.of(min);
- }
- return ConstantArgumentCount.between(min, max);
- }
- }
-
- @Override
- public boolean validate(CallContext callContext, boolean throwOnFailure) {
- switch (composition) {
- case SEQUENCE:
- final List<DataType> dataTypes = callContext.getArgumentDataTypes();
- if (dataTypes.size() != validators.size()) {
- return false;
- }
- for (int i = 0; i < validators.size(); i++) {
- final InputTypeValidator validator = validators.get(i);
- if (!((SingleInputTypeValidator) validator).validateArgument(callContext, i, 0, false)) {
- return false;
- }
- }
- return true;
- case AND:
- for (final InputTypeValidator validator : validators) {
- if (!validator.validate(callContext, false)) {
- return false;
- }
- }
- return true;
- case OR:
- for (final InputTypeValidator validator : validators) {
- if (validator.validate(callContext, false)) {
- return true;
- }
- }
- return false;
- default:
- throw new IllegalStateException("Unsupported composition.");
- }
- }
-
- @Override
- public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
- switch (composition) {
- case SEQUENCE:
- final List<Signature.Argument> arguments = new ArrayList<>();
- // according to precondition we can assume single type validators
- // thus we can pass constant 0s
- for (int i = 0; i < validators.size(); i++) {
- final List<Signature> signatures = validators.get(i).getExpectedSignatures(definition);
- final String type;
- if (signatures.size() > 1) {
- type = signatures.stream()
- .map(s -> s.getArguments().get(0).getType())
- .collect(Collectors.joining(" | ", "[", "]"));
- } else {
- type = signatures.get(0).getArguments().get(0).getType();
- }
- if (argumentNames != null) {
- arguments.add(Signature.Argument.of(argumentNames.get(i), type));
- } else {
- arguments.add(Signature.Argument.of(type));
- }
- }
- return Collections.singletonList(Signature.of(arguments));
- case AND:
- final List<Signature> signatures = validators.stream()
- .flatMap(v -> v.getExpectedSignatures(definition).stream())
- .collect(Collectors.toList());
- // handle only the case for simple single type conjunction like "<LITERAL> and STRING"
- final boolean isSimpleAnd = signatures.size() == validators.size() &&
- signatures.stream().allMatch(s -> s.getArguments().size() == 1);
- if (isSimpleAnd) {
- final String type = signatures.stream()
- .map(s -> s.getArguments().get(0).getType())
- .collect(Collectors.joining(" & ", "[", "]"));
- return Collections.singletonList(Signature.of(Signature.Argument.of(type)));
- }
- // use the signatures of the first validator for simplification
- return validators.get(0).getExpectedSignatures(definition);
- case OR:
- return validators.stream()
- .flatMap(v -> v.getExpectedSignatures(definition).stream())
- .collect(Collectors.toList());
- default:
- throw new IllegalStateException("Unsupported composition.");
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- CompositeTypeValidator that = (CompositeTypeValidator) o;
- return composition == that.composition &&
- validators.equals(that.validators) &&
- Objects.equals(argumentNames, that.argumentNames);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(composition, validators, argumentNames);
- }
-
-// --------------------------------------------------------------------------------------------
-
- /**
- * Returns the common minimum argument count or null if undefined.
- */
- private @Nullable Integer commonMin(List<ArgumentCount> counts) {
- // min=5, min=3, min=0 -> min=0
- // min=5, min=3, min=0, min=null -> min=null
- int commonMin = Integer.MAX_VALUE;
- for (ArgumentCount count : counts) {
- final Optional<Integer> min = count.getMinCount();
- if (!min.isPresent()) {
- return null;
- }
- commonMin = Math.min(commonMin, min.get());
- }
- if (commonMin == Integer.MAX_VALUE) {
- return null;
- }
- return commonMin;
- }
-
- /**
- * Returns the common maximum argument count or null if undefined.
- */
- private @Nullable Integer commonMax(List<ArgumentCount> counts) {
- // max=5, max=3, max=0 -> max=5
- // max=5, max=3, max=0, max=null -> max=null
- int commonMax = Integer.MIN_VALUE;
- for (ArgumentCount count : counts) {
- final Optional<Integer> max = count.getMaxCount();
- if (!max.isPresent()) {
- return null;
- }
- commonMax = Math.max(commonMax, max.get());
- }
- if (commonMax == Integer.MIN_VALUE) {
- return null;
- }
- return commonMax;
- }
-}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/ExplicitTypeValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/ExplicitTypeValidator.java
index ae37386..2005846 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/ExplicitTypeValidator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/ExplicitTypeValidator.java
@@ -20,35 +20,29 @@ package org.apache.flink.table.types.inference.validators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeValidator;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
-import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.inference.Signature.Argument;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
import org.apache.flink.util.Preconditions;
-import java.util.Collections;
-import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
/**
* Validator that checks if each operand corresponds to an explicitly defined logical type.
*/
@Internal
-public final class ExplicitTypeValidator implements SingleInputTypeValidator {
+public final class ExplicitTypeValidator implements ArgumentTypeValidator {
- private final List<LogicalType> expectedTypes;
+ private final LogicalType expectedType;
- public ExplicitTypeValidator(List<LogicalType> expectedTypes) {
- this.expectedTypes = Preconditions.checkNotNull(expectedTypes);
+ public ExplicitTypeValidator(LogicalType expectedType) {
+ this.expectedType = Preconditions.checkNotNull(expectedType);
}
@Override
- public boolean validateArgument(CallContext callContext, int argumentPos, int validatorPos, boolean throwOnFailure) {
- final LogicalType expectedType = expectedTypes.get(validatorPos);
+ public boolean validateArgument(CallContext callContext, int argumentPos, boolean throwOnFailure) {
final LogicalType actualType = callContext.getArgumentDataTypes().get(argumentPos).getLogicalType();
// quick path
if (expectedType.equals(actualType)) {
@@ -68,26 +62,8 @@ public final class ExplicitTypeValidator implements SingleInputTypeValidator {
}
@Override
- public ArgumentCount getArgumentCount() {
- return ConstantArgumentCount.of(expectedTypes.size());
- }
-
- @Override
- public boolean validate(CallContext callContext, boolean throwOnFailure) {
- for (int i = 0; i < expectedTypes.size(); i++) {
- if (!validateArgument(callContext, i, i, throwOnFailure)) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
- final List<Signature.Argument> args = expectedTypes.stream()
- .map(expectedDataType -> Argument.of(expectedDataType.toString()))
- .collect(Collectors.toList());
- return Collections.singletonList(Signature.of(args));
+ public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) {
+ return Argument.of(expectedType.asSummaryString());
}
@Override
@@ -99,11 +75,11 @@ public final class ExplicitTypeValidator implements SingleInputTypeValidator {
return false;
}
ExplicitTypeValidator that = (ExplicitTypeValidator) o;
- return expectedTypes.equals(that.expectedTypes);
+ return Objects.equals(expectedType, that.expectedType);
}
@Override
public int hashCode() {
- return Objects.hash(expectedTypes);
+ return Objects.hash(expectedType);
}
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/LiteralTypeValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/LiteralTypeValidator.java
index be03eab..1da7c18 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/LiteralTypeValidator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/LiteralTypeValidator.java
@@ -21,8 +21,10 @@ package org.apache.flink.table.types.inference.validators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeValidator;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeValidator;
import org.apache.flink.table.types.inference.Signature;
import java.util.Collections;
@@ -33,7 +35,7 @@ import java.util.Objects;
* Validator that checks if a single argument is a literal.
*/
@Internal
-public class LiteralTypeValidator implements SingleInputTypeValidator {
+public final class LiteralTypeValidator implements ArgumentTypeValidator, InputTypeValidator {
private final boolean allowNull;
@@ -42,7 +44,7 @@ public class LiteralTypeValidator implements SingleInputTypeValidator {
}
@Override
- public boolean validateArgument(CallContext callContext, int argumentPos, int validatorPos, boolean throwOnFailure) {
+ public boolean validateArgument(CallContext callContext, int argumentPos, boolean throwOnFailure) {
if (!callContext.isArgumentLiteral(argumentPos)) {
if (throwOnFailure) {
throw callContext.newValidationError("Literal expected.");
@@ -59,18 +61,23 @@ public class LiteralTypeValidator implements SingleInputTypeValidator {
}
@Override
+ public Signature.Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) {
+ return Signature.Argument.of("<LITERAL>");
+ }
+
+ @Override
public ArgumentCount getArgumentCount() {
return ConstantArgumentCount.of(1);
}
@Override
public boolean validate(CallContext callContext, boolean throwOnFailure) {
- return validateArgument(callContext, 0, 0, throwOnFailure);
+ return validateArgument(callContext, 0, throwOnFailure);
}
@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
- return Collections.singletonList(Signature.of(Signature.Argument.of("<LITERAL>")));
+ return Collections.singletonList(Signature.of(getExpectedArgument(definition, 0)));
}
@Override
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/LiteralTypeValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/OrTypeArgumentValidator.java
similarity index 50%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/LiteralTypeValidator.java
copy to flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/OrTypeArgumentValidator.java
index be03eab..eb91b8d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/LiteralTypeValidator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/OrTypeArgumentValidator.java
@@ -20,57 +20,51 @@ package org.apache.flink.table.types.inference.validators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeValidator;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.util.Preconditions;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
- * Validator that checks if a single argument is a literal.
+ * Validator that checks for a disjunction of multiple {@link ArgumentTypeValidator}s into one like
+ * {@code f(NUMERIC || STRING)}.
*/
@Internal
-public class LiteralTypeValidator implements SingleInputTypeValidator {
+public final class OrTypeArgumentValidator implements ArgumentTypeValidator {
- private final boolean allowNull;
+ private final List<? extends ArgumentTypeValidator> validators;
- public LiteralTypeValidator(boolean allowNull) {
- this.allowNull = allowNull;
+ public OrTypeArgumentValidator(List<? extends ArgumentTypeValidator> validators) {
+ Preconditions.checkArgument(validators.size() > 0);
+ this.validators = validators;
}
@Override
- public boolean validateArgument(CallContext callContext, int argumentPos, int validatorPos, boolean throwOnFailure) {
- if (!callContext.isArgumentLiteral(argumentPos)) {
- if (throwOnFailure) {
- throw callContext.newValidationError("Literal expected.");
+ public boolean validateArgument(CallContext callContext, int argumentPos, boolean throwOnFailure) {
+ for (ArgumentTypeValidator validator : validators) {
+ if (validator.validateArgument(callContext, argumentPos, false)) {
+ return true;
}
- return false;
}
- if (callContext.isArgumentNull(argumentPos) && !allowNull) {
- if (throwOnFailure) {
- throw callContext.newValidationError("Literal must not be NULL.");
+ // generate a helpful exception if possible
+ if (throwOnFailure) {
+ for (ArgumentTypeValidator validator : validators) {
+ validator.validateArgument(callContext, argumentPos, true);
}
- return false;
}
- return true;
- }
-
- @Override
- public ArgumentCount getArgumentCount() {
- return ConstantArgumentCount.of(1);
- }
-
- @Override
- public boolean validate(CallContext callContext, boolean throwOnFailure) {
- return validateArgument(callContext, 0, 0, throwOnFailure);
+ return false;
}
@Override
- public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
- return Collections.singletonList(Signature.of(Signature.Argument.of("<LITERAL>")));
+ public Signature.Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) {
+ final String argument = validators.stream()
+ .map(v -> v.getExpectedArgument(functionDefinition, argumentPos).getType())
+ .collect(Collectors.joining(" | ", "[", "]"));
+ return Signature.Argument.of(argument);
}
@Override
@@ -81,12 +75,12 @@ public class LiteralTypeValidator implements SingleInputTypeValidator {
if (o == null || getClass() != o.getClass()) {
return false;
}
- LiteralTypeValidator that = (LiteralTypeValidator) o;
- return allowNull == that.allowNull;
+ OrTypeArgumentValidator that = (OrTypeArgumentValidator) o;
+ return Objects.equals(validators, that.validators);
}
@Override
public int hashCode() {
- return Objects.hash(allowNull);
+ return Objects.hash(validators);
}
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/OrTypeInputValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/OrTypeInputValidator.java
new file mode 100644
index 0000000..c1cbcf6
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/OrTypeInputValidator.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.validators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeValidator;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Validator that checks for a disjunction of multiple {@link InputTypeValidator}s into one like
+ * {@code f(NUMERIC) || f(STRING)}.
+ */
+@Internal
+public final class OrTypeInputValidator implements InputTypeValidator {
+
+ private final List<? extends InputTypeValidator> validators;
+
+ public OrTypeInputValidator(List<? extends InputTypeValidator> validators) {
+ Preconditions.checkArgument(validators.size() > 0);
+ this.validators = validators;
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ final List<ArgumentCount> counts = new AbstractList<ArgumentCount>() {
+ public ArgumentCount get(int index) {
+ return validators.get(index).getArgumentCount();
+ }
+
+ public int size() {
+ return validators.size();
+ }
+ };
+ final Integer min = commonMin(counts);
+ final Integer max = commonMax(counts);
+ final ArgumentCount compositeCount = new ArgumentCount() {
+ @Override
+ public boolean isValidCount(int count) {
+ for (ArgumentCount c : counts) {
+ if (c.isValidCount(count)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Optional<Integer> getMinCount() {
+ return Optional.ofNullable(min);
+ }
+
+ @Override
+ public Optional<Integer> getMaxCount() {
+ return Optional.ofNullable(max);
+ }
+ };
+
+ // use constant count if applicable
+ if (min == null || max == null) {
+ // no boundaries
+ return compositeCount;
+ }
+ for (int i = min; i <= max; i++) {
+ if (!compositeCount.isValidCount(i)) {
+ // not the full range
+ return compositeCount;
+ }
+ }
+ if (min.equals(max)) {
+ return ConstantArgumentCount.of(min);
+ }
+ return ConstantArgumentCount.between(min, max);
+ }
+
+ @Override
+ public boolean validate(CallContext callContext, boolean throwOnFailure) {
+ for (final InputTypeValidator validator : validators) {
+ if (validator.validate(callContext, false)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
+ return validators.stream()
+ .flatMap(v -> v.getExpectedSignatures(definition).stream())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OrTypeInputValidator that = (OrTypeInputValidator) o;
+ return Objects.equals(validators, that.validators);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(validators);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Returns the common minimum argument count or null if undefined.
+ */
+ private @Nullable Integer commonMin(List<ArgumentCount> counts) {
+ // min=5, min=3, min=0 -> min=0
+ // min=5, min=3, min=0, min=null -> min=null
+ int commonMin = Integer.MAX_VALUE;
+ for (ArgumentCount count : counts) {
+ final Optional<Integer> min = count.getMinCount();
+ if (!min.isPresent()) {
+ return null;
+ }
+ commonMin = Math.min(commonMin, min.get());
+ }
+ if (commonMin == Integer.MAX_VALUE) {
+ return null;
+ }
+ return commonMin;
+ }
+
+ /**
+ * Returns the common maximum argument count or null if undefined.
+ */
+ private @Nullable Integer commonMax(List<ArgumentCount> counts) {
+ // max=5, max=3, max=0 -> max=5
+ // max=5, max=3, max=0, max=null -> max=null
+ int commonMax = Integer.MIN_VALUE;
+ for (ArgumentCount count : counts) {
+ final Optional<Integer> max = count.getMaxCount();
+ if (!max.isPresent()) {
+ return null;
+ }
+ commonMax = Math.max(commonMax, max.get());
+ }
+ if (commonMax == Integer.MIN_VALUE) {
+ return null;
+ }
+ return commonMax;
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/PassingTypeValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/PassingTypeValidator.java
index a14f451..ecc272d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/PassingTypeValidator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/PassingTypeValidator.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.types.inference.validators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeValidator;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeValidator;
@@ -34,7 +35,7 @@ import java.util.List;
* Validator that does not perform any validation and always passes.
*/
@Internal
-public final class PassingTypeValidator implements InputTypeValidator {
+public final class PassingTypeValidator implements InputTypeValidator, ArgumentTypeValidator {
private static final ArgumentCount PASSING_ARGUMENT_COUNT = ConstantArgumentCount.any();
@@ -62,4 +63,14 @@ public final class PassingTypeValidator implements InputTypeValidator {
public int hashCode() {
return PassingTypeValidator.class.hashCode();
}
+
+ @Override
+ public boolean validateArgument(CallContext callContext, int argumentPos, boolean throwOnFailure) {
+ return true;
+ }
+
+ @Override
+ public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) {
+ return Argument.of("*");
+ }
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/SequenceInputValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/SequenceInputValidator.java
new file mode 100644
index 0000000..db1486f
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/SequenceInputValidator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.validators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeValidator;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeValidator;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Validator that checks for a sequence of {@link ArgumentTypeValidator}s for validating an entire
+ * function signature like {@code f(STRING, NUMERIC)} or {@code f(s STRING, n NUMERIC)}.
+ */
+@Internal
+public final class SequenceInputValidator implements InputTypeValidator {
+
+ private final List<? extends ArgumentTypeValidator> validators;
+
+ private final @Nullable List<String> argumentNames;
+
+ public SequenceInputValidator(
+ List<? extends ArgumentTypeValidator> validators,
+ @Nullable List<String> argumentNames) {
+ Preconditions.checkArgument(validators.size() > 0);
+ Preconditions.checkArgument(argumentNames == null || argumentNames.size() == validators.size());
+ this.validators = validators;
+ this.argumentNames = argumentNames;
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return ConstantArgumentCount.of(validators.size());
+ }
+
+ @Override
+ public boolean validate(CallContext callContext, boolean throwOnFailure) {
+ final List<DataType> dataTypes = callContext.getArgumentDataTypes();
+ if (dataTypes.size() != validators.size()) {
+ return false;
+ }
+ for (int i = 0; i < validators.size(); i++) {
+ final ArgumentTypeValidator validator = validators.get(i);
+ if (!validator.validateArgument(callContext, i, throwOnFailure)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
+ final List<Signature.Argument> arguments = new ArrayList<>();
+ for (int i = 0; i < validators.size(); i++) {
+ if (argumentNames == null) {
+ arguments.add(validators.get(i).getExpectedArgument(definition, i));
+ } else {
+ arguments.add(Signature.Argument.of(
+ argumentNames.get(i),
+ validators.get(i).getExpectedArgument(definition, i).getType()));
+ }
+ }
+
+ return Collections.singletonList(Signature.of(arguments));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SequenceInputValidator that = (SequenceInputValidator) o;
+ return Objects.equals(validators, that.validators) &&
+ Objects.equals(argumentNames, that.argumentNames);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(validators, argumentNames);
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/VaryingSequenceTypeValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/VaryingSequenceTypeValidator.java
index ce442f5..41b2812 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/VaryingSequenceTypeValidator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/validators/VaryingSequenceTypeValidator.java
@@ -21,57 +21,45 @@ package org.apache.flink.table.types.inference.validators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeValidator;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeValidator;
import org.apache.flink.table.types.inference.Signature;
-import org.apache.flink.table.types.inference.validators.CompositeTypeValidator.Composition;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
- * A varying sequence of {@link SingleInputTypeValidator}s for validating an entire function signature
- * like {@code f(INT, STRING, NUMERIC...)}. The first n - 1 arguments must be constant and are validated
- * according to {@link Composition#SEQUENCE}. The n-th argument can occur 0, 1, or more times.
+ * A varying sequence of {@link ArgumentTypeValidator}s for validating an entire function signature
+ * like {@code f(INT, STRING, NUMERIC...)}. The first n - 1 arguments must be constant. The n-th
+ * argument can occur 0, 1, or more times.
*/
@Internal
public final class VaryingSequenceTypeValidator implements InputTypeValidator {
private final int constantArgumentCount;
- private final SingleInputTypeValidator constantSequence;
+ private final List<ArgumentTypeValidator> constantValidators;
- private final SingleInputTypeValidator varyingValidator;
+ private final ArgumentTypeValidator varyingValidator;
- private final @Nullable String varyingArgumentName;
+ private final @Nullable List<String> argumentNames;
public VaryingSequenceTypeValidator(
- List<SingleInputTypeValidator> validators,
+ List<ArgumentTypeValidator> validators,
@Nullable List<String> argumentNames) {
Preconditions.checkArgument(validators.size() > 0);
Preconditions.checkArgument(argumentNames == null || argumentNames.size() == validators.size());
constantArgumentCount = validators.size() - 1;
- final List<SingleInputTypeValidator> constantValidators = validators.subList(0, constantArgumentCount);
- if (argumentNames == null) {
- constantSequence = new CompositeTypeValidator(
- Composition.SEQUENCE,
- constantValidators,
- null);
- varyingArgumentName = null;
- } else {
- constantSequence = new CompositeTypeValidator(
- Composition.SEQUENCE,
- constantValidators,
- argumentNames.subList(0, constantArgumentCount));
- varyingArgumentName = argumentNames.get(constantArgumentCount);
- }
+ constantValidators = validators.subList(0, constantArgumentCount);
varyingValidator = validators.get(constantArgumentCount);
+ this.argumentNames = argumentNames;
}
@Override
@@ -83,11 +71,11 @@ public final class VaryingSequenceTypeValidator implements InputTypeValidator {
public boolean validate(CallContext callContext, boolean throwOnFailure) {
for (int i = 0; i < callContext.getArgumentDataTypes().size(); i++) {
if (i < constantArgumentCount) {
- if (!constantSequence.validateArgument(callContext, i, i, throwOnFailure)) {
+ if (!constantValidators.get(i).validateArgument(callContext, i, throwOnFailure)) {
return false;
}
} else {
- if (!varyingValidator.validateArgument(callContext, i, 0, throwOnFailure)) {
+ if (!varyingValidator.validateArgument(callContext, i, throwOnFailure)) {
return false;
}
}
@@ -97,30 +85,31 @@ public final class VaryingSequenceTypeValidator implements InputTypeValidator {
@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
- final List<Signature> signatures = varyingValidator.getExpectedSignatures(definition);
- final String type;
- if (signatures.size() > 1) {
- type = signatures.stream()
- .map(s -> s.getArguments().get(0).getType())
- .collect(Collectors.joining(" | ", "[", "]"));
- } else {
- type = signatures.get(0).getArguments().get(0).getType();
- }
+ final Signature.Argument varyingArgument = varyingValidator.getExpectedArgument(
+ definition,
+ constantValidators.size());
final Signature.Argument newArg;
- if (varyingArgumentName == null) {
+ final String type = varyingArgument.getType();
+ if (argumentNames == null) {
newArg = Signature.Argument.of(type + "...");
} else {
- newArg = Signature.Argument.of(varyingArgumentName, type + "...");
+ newArg = Signature.Argument.of(argumentNames.get(argumentNames.size() - 1), type + "...");
}
- final List<Signature> constantSignature = constantSequence.getExpectedSignatures(definition);
- return constantSignature.stream()
- .map(s -> {
- final List<Signature.Argument> newArgs =
- Stream.concat(s.getArguments().stream(), Stream.of(newArg)).collect(Collectors.toList());
- return Signature.of(newArgs);
- })
- .collect(Collectors.toList());
+ final List<Signature.Argument> arguments = new ArrayList<>();
+ for (int i = 0; i < constantValidators.size(); i++) {
+ if (argumentNames == null) {
+ arguments.add(constantValidators.get(i).getExpectedArgument(definition, i));
+ } else {
+ arguments.add(Signature.Argument.of(
+ argumentNames.get(i),
+ constantValidators.get(i).getExpectedArgument(definition, i).getType()));
+ }
+ }
+
+ arguments.add(newArg);
+
+ return Collections.singletonList(Signature.of(arguments));
}
@Override
@@ -133,17 +122,13 @@ public final class VaryingSequenceTypeValidator implements InputTypeValidator {
}
VaryingSequenceTypeValidator that = (VaryingSequenceTypeValidator) o;
return constantArgumentCount == that.constantArgumentCount &&
- constantSequence.equals(that.constantSequence) &&
- varyingValidator.equals(that.varyingValidator) &&
- Objects.equals(varyingArgumentName, that.varyingArgumentName);
+ Objects.equals(constantValidators, that.constantValidators) &&
+ Objects.equals(varyingValidator, that.varyingValidator) &&
+ Objects.equals(argumentNames, that.argumentNames);
}
@Override
public int hashCode() {
- return Objects.hash(
- constantArgumentCount,
- constantSequence,
- varyingValidator,
- varyingArgumentName);
+ return Objects.hash(constantArgumentCount, constantValidators, varyingValidator, argumentNames);
}
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeValidatorsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeValidatorsTest.java
index efa5863..aac5449 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeValidatorsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeValidatorsTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.utils.CallContextMock;
import org.apache.flink.table.types.inference.utils.FunctionDefinitionMock;
-import org.apache.flink.table.types.inference.validators.SingleInputTypeValidator;
import org.junit.Rule;
import org.junit.Test;
@@ -36,18 +35,19 @@ import org.junit.runners.Parameterized.Parameters;
import javax.annotation.Nullable;
-import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static java.util.Arrays.asList;
import static org.apache.flink.table.types.inference.InputTypeValidators.ANY;
import static org.apache.flink.table.types.inference.InputTypeValidators.LITERAL;
import static org.apache.flink.table.types.inference.InputTypeValidators.LITERAL_OR_NULL;
import static org.apache.flink.table.types.inference.InputTypeValidators.PASSING;
import static org.apache.flink.table.types.inference.InputTypeValidators.and;
import static org.apache.flink.table.types.inference.InputTypeValidators.explicit;
+import static org.apache.flink.table.types.inference.InputTypeValidators.explicitSequence;
import static org.apache.flink.table.types.inference.InputTypeValidators.or;
import static org.apache.flink.table.types.inference.InputTypeValidators.sequence;
import static org.apache.flink.table.types.inference.InputTypeValidators.varyingSequence;
@@ -61,7 +61,7 @@ public class InputTypeValidatorsTest {
@Parameters
public static List<TestSpec> testData() {
- return Arrays.asList(
+ return asList(
// 2 arguments
TestSpec
.forValidator(PASSING)
@@ -76,31 +76,40 @@ public class InputTypeValidatorsTest {
// full equivalence
TestSpec
- .forValidator(explicit(DataTypes.INT()))
+ .forValidator(explicitSequence(DataTypes.INT()))
.inputTypes(DataTypes.INT())
.expectSuccess(),
+ // invalid named sequence
+ TestSpec
+ .forValidator(
+ explicitSequence(
+ new String[]{"i", "s"},
+ new DataType[]{DataTypes.INT(), DataTypes.STRING()}))
+ .inputTypes(DataTypes.INT())
+ .expectErrorMessage("Invalid input arguments. Expected signatures are:\nf(i INT, s STRING)"),
+
// incompatible nullability
TestSpec
- .forValidator(explicit(DataTypes.BIGINT().notNull()))
+ .forValidator(explicitSequence(DataTypes.BIGINT().notNull()))
.inputTypes(DataTypes.BIGINT())
.expectErrorMessage("Unsupported argument type. Expected type 'BIGINT NOT NULL' but actual type was 'BIGINT'."),
// implicit cast
TestSpec
- .forValidator(explicit(DataTypes.BIGINT()))
+ .forValidator(explicitSequence(DataTypes.BIGINT()))
.inputTypes(DataTypes.INT())
.expectSuccess(),
// incompatible types
TestSpec
- .forValidator(explicit(DataTypes.BIGINT()))
+ .forValidator(explicitSequence(DataTypes.BIGINT()))
.inputTypes(DataTypes.STRING())
.expectErrorMessage("Unsupported argument type. Expected type 'BIGINT' but actual type was 'STRING'."),
// incompatible number of arguments
TestSpec
- .forValidator(explicit(DataTypes.BIGINT(), DataTypes.BIGINT()))
+ .forValidator(explicitSequence(DataTypes.BIGINT(), DataTypes.BIGINT()))
.inputTypes(DataTypes.BIGINT())
.expectErrorMessage("Invalid number of arguments. At least 2 arguments expected but 1 passed."),
@@ -118,34 +127,22 @@ public class InputTypeValidatorsTest {
// left of OR
TestSpec
- .forValidator(or(explicit(DataTypes.INT()), explicit(DataTypes.NULL())))
+ .forValidator(or(explicitSequence(DataTypes.INT()), explicitSequence(DataTypes.NULL())))
.inputTypes(DataTypes.INT())
.expectSuccess(),
// right of OR
TestSpec
- .forValidator(or(explicit(DataTypes.INT()), explicit(DataTypes.NULL())))
+ .forValidator(or(explicitSequence(DataTypes.INT()), explicitSequence(DataTypes.NULL())))
.inputTypes(DataTypes.NULL())
.expectSuccess(),
// invalid type in OR
TestSpec
- .forValidator(or(explicit(DataTypes.INT()), explicit(DataTypes.NULL())))
+ .forValidator(or(explicitSequence(DataTypes.INT()), explicitSequence(DataTypes.NULL())))
.inputTypes(DataTypes.BOOLEAN())
.expectErrorMessage("Invalid input arguments. Expected signatures are:\nf(INT)\nf(NULL)"),
- // invalid number of arguments in AND
- TestSpec
- .forValidator(and(explicit(DataTypes.INT()), explicit(DataTypes.INT(), DataTypes.STRING())))
- .inputTypes(DataTypes.INT())
- .expectErrorMessage("Invalid number of arguments. 1 arguments passed."),
-
- // accepted combination of AND
- TestSpec
- .forValidator(and(explicit(DataTypes.INT()), PASSING))
- .inputTypes(DataTypes.INT())
- .expectSuccess(),
-
// explicit sequence
TestSpec
.forValidator(sequence(explicit(DataTypes.INT()), explicit(DataTypes.BOOLEAN())))
@@ -166,7 +163,9 @@ public class InputTypeValidatorsTest {
// invalid named sequence
TestSpec
- .forValidator(sequence(new String[]{"any", "int"}, new SingleInputTypeValidator[]{ANY, explicit(DataTypes.INT())}))
+ .forValidator(sequence(
+ new String[]{"any", "int"},
+ new ArgumentTypeValidator[]{ANY, explicit(DataTypes.INT())}))
.inputTypes(DataTypes.STRING(), DataTypes.BOOLEAN())
.expectErrorMessage("Invalid input arguments. Expected signatures are:\nf(any <ANY>, int INT)"),
@@ -201,7 +200,7 @@ public class InputTypeValidatorsTest {
.forValidator(
varyingSequence(
new String[]{"i", "s", "var"},
- new SingleInputTypeValidator[]{
+ new ArgumentTypeValidator[]{
explicit(DataTypes.INT()),
explicit(DataTypes.STRING()),
explicit(DataTypes.BOOLEAN())}))
@@ -213,7 +212,7 @@ public class InputTypeValidatorsTest {
.forValidator(
varyingSequence(
new String[]{"i", "s", "var"},
- new SingleInputTypeValidator[]{
+ new ArgumentTypeValidator[]{
explicit(DataTypes.INT()),
explicit(DataTypes.STRING()),
explicit(DataTypes.BOOLEAN())}))
@@ -225,7 +224,7 @@ public class InputTypeValidatorsTest {
.forValidator(
varyingSequence(
new String[]{"i", "s", "var"},
- new SingleInputTypeValidator[]{
+ new ArgumentTypeValidator[]{
explicit(DataTypes.INT()),
explicit(DataTypes.STRING()),
explicit(DataTypes.BOOLEAN())}))
@@ -237,7 +236,7 @@ public class InputTypeValidatorsTest {
.forValidator(
varyingSequence(
new String[]{"i", "s", "var"},
- new SingleInputTypeValidator[]{
+ new ArgumentTypeValidator[]{
explicit(DataTypes.INT()),
explicit(DataTypes.STRING()),
explicit(DataTypes.BOOLEAN())}))
@@ -249,7 +248,7 @@ public class InputTypeValidatorsTest {
.forValidator(
varyingSequence(
new String[]{"i", "s", "var"},
- new SingleInputTypeValidator[]{
+ new ArgumentTypeValidator[]{
explicit(DataTypes.INT()),
explicit(DataTypes.STRING()),
or(explicit(DataTypes.BOOLEAN()), explicit(DataTypes.INT()))}))
@@ -261,7 +260,7 @@ public class InputTypeValidatorsTest {
.forValidator(
varyingSequence(
new String[]{"i", "s", "var"},
- new SingleInputTypeValidator[]{
+ new ArgumentTypeValidator[]{
explicit(DataTypes.INT()),
explicit(DataTypes.STRING()),
or(explicit(DataTypes.BOOLEAN()), explicit(DataTypes.INT()))}))
@@ -329,7 +328,7 @@ public class InputTypeValidatorsTest {
}
TestSpec inputTypes(DataType... dataTypes) {
- this.inputTypes = Arrays.asList(dataTypes);
+ this.inputTypes = asList(dataTypes);
return this;
}