You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/23 15:41:20 UTC
[flink] branch master updated: [FLINK-13266][table] Port
function-related descriptors to flink-table-common
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 24f1dce [FLINK-13266][table] Port function-related descriptors to flink-table-common
24f1dce is described below
commit 24f1dce1ceca35ef0177fa16648fd40cbcb52ded
Author: godfreyhe <go...@163.com>
AuthorDate: Thu Jul 18 19:26:36 2019 +0800
[FLINK-13266][table] Port function-related descriptors to flink-table-common
FileSystem/OldCsv/RowtimeValidator/SchemaValidator will be ported in other commits
---
.../flink/table/descriptors/ClassInstance.java | 232 +++++++++++++++++++
.../table/descriptors/ClassInstanceValidator.java | 70 ++++++
.../table/descriptors/FunctionDescriptor.java | 56 +++++
.../descriptors/FunctionDescriptorValidator.java | 49 +++++
.../table/descriptors/HierarchyDescriptor.java} | 27 ++-
.../descriptors/HierarchyDescriptorValidator.java} | 40 ++--
.../flink/table/descriptors/LiteralValue.java | 187 ++++++++++++++++
.../table/descriptors/LiteralValueValidator.java | 162 ++++++++++++++
.../flink/table/descriptors/ClassInstance.scala | 245 ---------------------
.../table/descriptors/ClassInstanceValidator.scala | 59 -----
.../table/descriptors/FunctionDescriptor.scala | 62 ------
.../descriptors/FunctionDescriptorValidator.scala | 57 -----
.../flink/table/descriptors/LiteralValue.scala | 228 -------------------
.../table/descriptors/LiteralValueValidator.scala | 145 ------------
.../table/descriptors/ClassInstanceTest.scala | 12 +-
.../table/descriptors/FunctionDescriptorTest.scala | 6 +-
.../flink/table/descriptors/LiteralValueTest.scala | 18 +-
.../table/functions/FunctionServiceTest.scala | 30 +--
18 files changed, 828 insertions(+), 857 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstance.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstance.java
new file mode 100644
index 0000000..573157d
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstance.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Either;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.types.Either.Left;
+import static org.apache.flink.types.Either.Right;
+
+/**
+ * Descriptor for a class instance. A class instance is a Java/Scala object created from a class
+ * with a public constructor (with or without parameters).
+ */
+@PublicEvolving
+public class ClassInstance extends HierarchyDescriptor {
+
+ private String className;
+ // the parameter is either a literal value or the instance of a class
+ private List<Either<LiteralValue, ClassInstance>> constructor = new ArrayList<>();
+
+ /**
+ * Sets the fully qualified class name for creating an instance.
+ *
+ * <p>E.g. org.example.MyClass or org.example.MyClass$StaticInnerClass
+ *
+ * @param className fully qualified class name
+ */
+ public ClassInstance of(String className) {
+ this.className = className;
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of literal type. The type is automatically derived from
+ * the value. Currently, this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR. Expression
+ * values are not allowed.
+ *
+ * <p>Examples:
+ * - "true", "false" -> BOOLEAN
+ * - "42", "-5" -> INT
+ * - "2.0", "1234.222" -> DOUBLE
+ * - VARCHAR otherwise
+ *
+ * <p>For other types and explicit type declaration use {@link #parameter(String, String)} or
+ * {@link #parameter(TypeInformation, String)}.
+ */
+ public ClassInstance parameterString(String valueString) {
+ constructor.add(Left(new LiteralValue().value(valueString)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of literal type. The type is explicitly defined using a
+ * type string such as VARCHAR, FLOAT, BOOLEAN, INT, BIGINT, etc. The value is parsed
+ * accordingly. Expression values are not allowed.
+ *
+ * @param typeString the type string that define how to parse the given value string
+ * @param valueString the literal value to be parsed
+ */
+ public ClassInstance parameter(String typeString, String valueString) {
+ constructor.add(Left(new LiteralValue().of(typeString).value(valueString)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of literal type. The type is explicitly defined using
+ * type information. The value is parsed accordingly. Expression values are not allowed.
+ *
+ * @param typeInfo the type that define how to parse the given value string
+ * @param valueString the literal value to be parsed
+ */
+ public ClassInstance parameter(TypeInformation<?> typeInfo, String valueString) {
+ constructor.add(Left(new LiteralValue().of(typeInfo).value(valueString)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of BOOLEAN type.
+ *
+ * @param value BOOLEAN value
+ */
+ public ClassInstance parameter(boolean value) {
+ constructor.add(Left(new LiteralValue().of(Types.BOOLEAN).value(value)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of DOUBLE type.
+ *
+ * @param value DOUBLE value
+ */
+ public ClassInstance parameter(double value) {
+ constructor.add(Left(new LiteralValue().of(Types.DOUBLE).value(value)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of FLOAT type.
+ *
+ * @param value FLOAT value
+ */
+ public ClassInstance parameter(float value) {
+ constructor.add(Left(new LiteralValue().of(Types.FLOAT).value(value)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of INT type.
+ *
+ * @param value INT value
+ */
+ public ClassInstance parameter(int value) {
+ constructor.add(Left(new LiteralValue().of(Types.INT).value(value)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of VARCHAR type.
+ *
+ * @param value VARCHAR value
+ */
+ public ClassInstance parameter(String value) {
+ constructor.add(Left(new LiteralValue().of(Types.STRING).value(value)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of BIGINT type.
+ *
+ * @param value BIGINT value
+ */
+ public ClassInstance parameter(long value) {
+ constructor.add(Left(new LiteralValue().of(Types.LONG).value(value)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of TINYINT type.
+ *
+ * @param value TINYINT value
+ */
+ public ClassInstance parameter(byte value) {
+ constructor.add(Left(new LiteralValue().of(Types.BYTE).value(value)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of SMALLINT type.
+ *
+ * @param value SMALLINT value
+ */
+ public ClassInstance parameter(short value) {
+ constructor.add(Left(new LiteralValue().of(Types.SHORT).value(value)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of DECIMAL type.
+ *
+ * @param value DECIMAL value
+ */
+ public ClassInstance parameter(BigDecimal value) {
+ constructor.add(Left(new LiteralValue().of(Types.BIG_DEC).value(value)));
+ return this;
+ }
+
+ /**
+ * Adds a constructor parameter value of a class instance (i.e. a Java object with a public
+ * constructor).
+ *
+ * @param classInstance description of a class instance (i.e. a Java object with a public
+ * constructor).
+ */
+ public ClassInstance parameter(ClassInstance classInstance) {
+ constructor.add(Right(classInstance));
+ return this;
+ }
+
+ /**
+ * Converts this descriptor into a set of properties.
+ */
+ @Override
+ public Map<String, String> toProperties() {
+ DescriptorProperties properties = new DescriptorProperties();
+ addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties);
+ return properties.asMap();
+ }
+
+ /**
+ * Internal method for properties conversion.
+ */
+ @Override
+ public void addPropertiesWithPrefix(String keyPrefix, DescriptorProperties properties) {
+ if (className != null) {
+ properties.putString(keyPrefix + ClassInstanceValidator.CLASS, className);
+ }
+ for (int i = 0; i < constructor.size(); ++i) {
+ Either<LiteralValue, ClassInstance> either = constructor.get(i);
+ String keyPrefixWithIdx = keyPrefix + ClassInstanceValidator.CONSTRUCTOR + "." + i + ".";
+ if (either.isLeft()) {
+ either.left().addPropertiesWithPrefix(keyPrefixWithIdx, properties);
+ } else {
+ either.right().addPropertiesWithPrefix(keyPrefixWithIdx, properties);
+ }
+ }
+ }
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstanceValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstanceValidator.java
new file mode 100644
index 0000000..a5abebb
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstanceValidator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Validator for {@link ClassInstance}.
+ */
+@Internal
+public class ClassInstanceValidator extends HierarchyDescriptorValidator {
+
+ public static final String CLASS = "class";
+ public static final String CONSTRUCTOR = "constructor";
+
+ /**
+ * @param keyPrefix prefix to be added to every property before validation
+ */
+ public ClassInstanceValidator(String keyPrefix) {
+ super(keyPrefix);
+ }
+
+ public ClassInstanceValidator() {
+ this(EMPTY_PREFIX);
+ }
+
+ @Override
+ protected void validateWithPrefix(String keyPrefix, DescriptorProperties properties) {
+ // check class name
+ properties.validateString(keyPrefix + CLASS, false, 1);
+
+ // check constructor
+ String constructorPrefix = keyPrefix + CONSTRUCTOR;
+
+ List<Map<String, String>> constructorProperties =
+ properties.getVariableIndexedProperties(constructorPrefix, new ArrayList<>());
+ for (int i = 0; i < constructorProperties.size(); ++i) {
+ String keyPrefixWithIdx = constructorPrefix + "." + i + ".";
+ if (constructorProperties.get(i).containsKey(ClassInstanceValidator.CLASS)) {
+ ClassInstanceValidator classInstanceValidator = new ClassInstanceValidator(keyPrefixWithIdx);
+ classInstanceValidator.validate(properties);
+ }
+ // literal value
+ else {
+ LiteralValueValidator primitiveValueValidator = new LiteralValueValidator(keyPrefixWithIdx);
+ primitiveValueValidator.validate(properties);
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptor.java
new file mode 100644
index 0000000..72e1068
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/**
+ * Descriptor for describing a function.
+ */
+@PublicEvolving
+public class FunctionDescriptor implements Descriptor {
+ private String from;
+ private ClassInstance classInstance;
+
+ /**
+ * Creates a function from a class description.
+ */
+ public FunctionDescriptor fromClass(ClassInstance classType) {
+ from = FunctionDescriptorValidator.FROM_VALUE_CLASS;
+ this.classInstance = classType;
+ return this;
+ }
+
+ /**
+ * Converts this descriptor into a set of properties.
+ */
+ @Override
+ public Map<String, String> toProperties() {
+ DescriptorProperties properties = new DescriptorProperties();
+ if (from != null) {
+ properties.putString(FunctionDescriptorValidator.FROM, from);
+ }
+ if (classInstance != null) {
+ properties.putProperties(classInstance.toProperties());
+ }
+ return properties.asMap();
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptorValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptorValidator.java
new file mode 100644
index 0000000..059be19
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptorValidator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Validator for {@link FunctionDescriptor}.
+ */
+@Internal
+public class FunctionDescriptorValidator implements DescriptorValidator {
+
+ public static final String FROM = "from";
+ public static final String FROM_VALUE_CLASS = "class";
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ Map<String, Consumer<String>> enumValidation = new HashMap<>();
+ enumValidation.put(FROM_VALUE_CLASS, s -> new ClassInstanceValidator().validate(properties));
+
+ // check for 'from'
+ if (properties.containsKey(FROM)) {
+ properties.validateEnum(FROM, false, enumValidation);
+ } else {
+ throw new ValidationException("Could not find 'from' property for function.");
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptor.java
similarity index 61%
rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptor.java
index b958b1c..a8790d7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptor.java
@@ -16,20 +16,23 @@
* limitations under the License.
*/
-package org.apache.flink.table.descriptors
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
/**
- * A descriptor that may exist in an arbitrary level (be recursively included by other
- * descriptors).
- */
-abstract class HierarchyDescriptor extends Descriptor {
+ * A descriptor that may exist in an arbitrary level (be recursively included by other
+ * descriptors).
+ */
+@PublicEvolving
+public abstract class HierarchyDescriptor implements Descriptor {
- /**
- * Internal method for properties conversion. All the property keys will be prefixed with the
- * given key prefix.
- */
- private[flink] def addPropertiesWithPrefix(
- keyPrefix: String,
- properties: DescriptorProperties): Unit
+ /**
+ * Internal method for properties conversion. All the property keys will be prefixed with the
+ * given key prefix.
+ */
+ public abstract void addPropertiesWithPrefix(
+ String keyPrefix,
+ DescriptorProperties properties);
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.java
similarity index 52%
rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.java
index d7bf573..c52cebf 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.java
@@ -16,26 +16,34 @@
* limitations under the License.
*/
-package org.apache.flink.table.descriptors
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
/**
- * Validator for a [[HierarchyDescriptor]].
- *
- * @param keyPrefix prefix to be added to every property before validation
- */
-abstract class HierarchyDescriptorValidator(keyPrefix: String) extends DescriptorValidator{
+ * Validator for a {@link HierarchyDescriptor}.
+ */
+@Internal
+public abstract class HierarchyDescriptorValidator implements DescriptorValidator {
- final def validate(properties: DescriptorProperties): Unit = {
- validateWithPrefix(keyPrefix, properties)
- }
+ public static final String EMPTY_PREFIX = "";
- /**
- * Performs validation with a prefix for the keys.
- */
- protected def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit
+ private String keyPrefix;
-}
+ /**
+ * @param keyPrefix prefix to be added to every property before validation
+ */
+ public HierarchyDescriptorValidator(String keyPrefix) {
+ this.keyPrefix = keyPrefix;
+ }
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ validateWithPrefix(keyPrefix, properties);
+ }
-object HierarchyDescriptorValidator {
- val EMPTY_PREFIX = ""
+ /**
+ * Performs validation with a prefix for the keys.
+ */
+ protected abstract void validateWithPrefix(String keyPrefix, DescriptorProperties properties);
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValue.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValue.java
new file mode 100644
index 0000000..8801169
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValue.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.TypeStringUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+/**
+ * Descriptor for a literal value. A literal value consists of a type and the actual value.
+ * Expression values are not allowed.
+ *
+ * <p>If no type is set, the type is automatically derived from the value. Currently,
+ * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
+ *
+ * <p>Examples:
+ * - "true", "false" -> BOOLEAN
+ * - "42", "-5" -> INT
+ * - "2.0", "1234.222" -> DOUBLE
+ * - VARCHAR otherwise
+ */
+@PublicEvolving
+public class LiteralValue extends HierarchyDescriptor {
+ private String typeInfo;
+ private Object value;
+
+ /**
+ * Type information of the literal value. E.g. Types.BOOLEAN.
+ *
+ * @param typeInfo type information describing the value
+ */
+ public LiteralValue of(TypeInformation<?> typeInfo) {
+ Preconditions.checkNotNull(typeInfo, "Type information must not be null.");
+ this.typeInfo = TypeStringUtils.writeTypeInfo(typeInfo);
+ return this;
+ }
+
+ /**
+ * Type string of the literal value. E.g. "BOOLEAN".
+ *
+ * @param typeString type string describing the value
+ */
+ public LiteralValue of(String typeString) {
+ this.typeInfo = typeString;
+ return this;
+ }
+
+ /**
+ * Literal BOOLEAN value.
+ *
+ * @param value literal BOOLEAN value
+ */
+ public LiteralValue value(boolean value) {
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * Literal INT value.
+ *
+ * @param value literal INT value
+ */
+ public LiteralValue value(int value) {
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * Literal DOUBLE value.
+ *
+ * @param value literal DOUBLE value
+ */
+ public LiteralValue value(double value) {
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * Literal FLOAT value.
+ *
+ * @param value literal FLOAT value
+ */
+ public LiteralValue value(float value) {
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * Literal value either for an explicit VARCHAR type or automatically derived type.
+ *
+ * <p>If no type is set, the type is automatically derived from the value. Currently,
+ * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
+ *
+ * @param value literal value
+ */
+ public LiteralValue value(String value) {
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * Literal BIGINT value.
+ *
+ * @param value literal BIGINT value
+ */
+ public LiteralValue value(long value) {
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * Literal TINYINT value.
+ *
+ * @param value literal TINYINT value
+ */
+ public LiteralValue value(byte value) {
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * Literal SMALLINT value.
+ *
+ * @param value literal SMALLINT value
+ */
+ public LiteralValue value(short value) {
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * Literal DECIMAL value.
+ *
+ * @param value literal DECIMAL value
+ */
+ public LiteralValue value(BigDecimal value) {
+ this.value = value;
+ return this;
+ }
+
+ @Override
+ public Map<String, String> toProperties() {
+ DescriptorProperties properties = new DescriptorProperties();
+ addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties);
+ return properties.asMap();
+ }
+
+ @Override
+ public void addPropertiesWithPrefix(String keyPrefix, DescriptorProperties properties) {
+ if (typeInfo != null) {
+ properties.putString(keyPrefix + "type", typeInfo);
+ if (value != null) {
+ properties.putString(keyPrefix + "value", String.valueOf(value));
+ }
+ } else {
+ // do not allow values in top-level
+ if (keyPrefix.equals(HierarchyDescriptorValidator.EMPTY_PREFIX)) {
+ throw new ValidationException(
+ "Literal values with implicit type must not exist in the top level of a hierarchy.");
+ }
+ if (value != null) {
+ properties.putString(keyPrefix.substring(0, keyPrefix.length() - 1), String.valueOf(value));
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValueValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValueValidator.java
new file mode 100644
index 0000000..ffd6503
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValueValidator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+
+/**
+ * Validator for {@link LiteralValue}.
+ */
+@Internal
+public class LiteralValueValidator extends HierarchyDescriptorValidator {
+
+ public static final String TYPE = "type";
+ public static final String VALUE = "value";
+
+ /*
+ * TODO The following types need to be supported next.
+ * Types.SQL_DATE
+ * Types.SQL_TIME
+ * Types.SQL_TIMESTAMP
+ * Types.PRIMITIVE_ARRAY
+ * Types.OBJECT_ARRAY
+ * Types.MAP
+ * Types.MULTISET
+ * null
+ */
+
+ /**
+ * Gets the value according to the type and value strings.
+ *
+ * @param keyPrefix the prefix of the literal type key
+ * @param properties the descriptor properties
+ * @return the derived value
+ */
+ public static Object getValue(String keyPrefix, DescriptorProperties properties) {
+ String typeKey = keyPrefix + TYPE;
+ // explicit type
+ if (properties.containsKey(typeKey)) {
+ String valueKey = keyPrefix + VALUE;
+ TypeInformation<?> typeInfo = properties.getType(typeKey);
+
+ if (typeInfo.equals(Types.BIG_DEC)) {
+ return properties.getBigDecimal(valueKey);
+ } else if (typeInfo.equals(Types.BOOLEAN)) {
+ return properties.getBoolean(valueKey);
+ } else if (typeInfo.equals(Types.BYTE)) {
+ return properties.getByte(valueKey);
+ } else if (typeInfo.equals(Types.DOUBLE)) {
+ return properties.getDouble(valueKey);
+ } else if (typeInfo.equals(Types.FLOAT)) {
+ return properties.getFloat(valueKey);
+ } else if (typeInfo.equals(Types.INT)) {
+ return properties.getInt(valueKey);
+ } else if (typeInfo.equals(Types.LONG)) {
+ return properties.getLong(valueKey);
+ } else if (typeInfo.equals(Types.SHORT)) {
+ return properties.getShort(valueKey);
+ } else if (typeInfo.equals(Types.STRING)) {
+ return properties.getString(valueKey);
+ } else {
+ throw new TableException("Unsupported type '" + typeInfo.getTypeClass() + "'.");
+ }
+ }
+ // implicit type
+ else {
+ return deriveTypeStringFromValueString(
+ properties.getString(keyPrefix.substring(0, keyPrefix.length() - 1)));
+ }
+ }
+
+ /**
+ * Tries to derive a literal value from the given string value.
+ * The derivation priority for the types are BOOLEAN, INT, DOUBLE, and VARCHAR.
+ *
+ * @param valueString the string formatted value
+ * @return parsed value
+ */
+ private static Object deriveTypeStringFromValueString(String valueString) {
+ if (valueString.equals("true") || valueString.equals("false")) {
+ return Boolean.valueOf(valueString);
+ } else {
+ try {
+ return Integer.valueOf(valueString);
+ } catch (NumberFormatException e1) {
+ try {
+ return Double.valueOf(valueString);
+ } catch (NumberFormatException e2) {
+ return valueString;
+ }
+ }
+ }
+ }
+
+ /**
+ * @param keyPrefix prefix to be added to every property before validation
+ */
+ public LiteralValueValidator(String keyPrefix) {
+ super(keyPrefix);
+ }
+
+ @Override
+ protected void validateWithPrefix(String keyPrefix, DescriptorProperties properties) {
+ String typeKey = keyPrefix + TYPE;
+ properties.validateType(typeKey, true, false);
+
+ // explicit type
+ if (properties.containsKey(typeKey)) {
+ String valueKey = keyPrefix + VALUE;
+ TypeInformation<?> typeInfo = properties.getType(typeKey);
+ if (typeInfo.equals(Types.BIG_DEC)) {
+ properties.validateBigDecimal(valueKey, false);
+ } else if (typeInfo.equals(Types.BOOLEAN)) {
+ properties.validateBoolean(valueKey, false);
+ } else if (typeInfo.equals(Types.BYTE)) {
+ properties.validateByte(valueKey, false);
+ } else if (typeInfo.equals(Types.DOUBLE)) {
+ properties.validateDouble(valueKey, false);
+ } else if (typeInfo.equals(Types.FLOAT)) {
+ properties.validateFloat(valueKey, false);
+ } else if (typeInfo.equals(Types.INT)) {
+ properties.validateInt(valueKey, false);
+ } else if (typeInfo.equals(Types.LONG)) {
+ properties.validateLong(valueKey, false);
+ } else if (typeInfo.equals(Types.SHORT)) {
+ properties.validateShort(valueKey, false);
+ } else if (typeInfo.equals(Types.STRING)) {
+ properties.validateString(valueKey, false);
+ } else {
+ throw new TableException("Unsupported type '" + typeInfo + "'.");
+ }
+ }
+ // implicit type
+ else {
+ // do not allow values in top-level
+ if (keyPrefix.equals(HierarchyDescriptorValidator.EMPTY_PREFIX)) {
+ throw new ValidationException(
+ "Literal values with implicit type must not exist in the top level of a hierarchy.");
+ }
+ properties.validateString(keyPrefix.substring(0, keyPrefix.length() - 1), false);
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala
deleted file mode 100644
index 34ed6dd..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.Types
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Descriptor for a class instance. A class instance is a Java/Scala object created from a class
- * with a public constructor (with or without parameters).
- */
-class ClassInstance extends HierarchyDescriptor {
-
- private var className: Option[String] = None
-
- // the parameter is either a literal value or the instance of a class
- private val constructor: ArrayBuffer[Either[LiteralValue, ClassInstance]] = ArrayBuffer()
-
- /**
- * Sets the fully qualified class name for creating an instance.
- *
- * E.g. org.example.MyClass or org.example.MyClass$StaticInnerClass
- *
- * @param className fully qualified class name
- */
- def of(className: String): ClassInstance = {
- this.className = Option(className)
- this
- }
-
- /**
- * Adds a constructor parameter value of literal type. The type is automatically derived from
- * the value. Currently, this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR. Expression
- * values are not allowed.
- *
- * Examples:
- * - "true", "false" -> BOOLEAN
- * - "42", "-5" -> INT
- * - "2.0", "1234.222" -> DOUBLE
- * - VARCHAR otherwise
- *
- * For other types and explicit type declaration use [[parameter(String, String)]] or
- * [[parameter(TypeInformation, String)]].
- *
- */
- def parameterString(valueString: String): ClassInstance = {
- constructor += Left(new LiteralValue().value(valueString))
- this
- }
-
- /**
- * Adds a constructor parameter value of literal type. The type is explicitly defined using a
- * type string such as VARCHAR, FLOAT, BOOLEAN, INT, BIGINT, etc. The value is parsed
- * accordingly. Expression values are not allowed.
- *
- * @param typeString the type string that define how to parse the given value string
- * @param valueString the literal value to be parsed
- */
- def parameter(typeString: String, valueString: String): ClassInstance = {
- constructor += Left(new LiteralValue().of(typeString).value(valueString))
- this
- }
-
- /**
- * Adds a constructor parameter value of literal type. The type is explicitly defined using
- * type information. The value is parsed accordingly. Expression values are not allowed.
- *
- * @param typeInfo the type that define how to parse the given value string
- * @param valueString the literal value to be parsed
- */
- def parameter(typeInfo: TypeInformation[_], valueString: String): ClassInstance = {
- constructor += Left(new LiteralValue().of(typeInfo).value(valueString))
- this
- }
-
- /**
- * Adds a constructor parameter value of BOOLEAN type.
- *
- * @param value BOOLEAN value
- */
- def parameter(value: Boolean): ClassInstance = {
- constructor += Left(new LiteralValue().of(Types.BOOLEAN).value(value))
- this
- }
-
- /**
- * Adds a constructor parameter value of DOUBLE type.
- *
- * @param value DOUBLE value
- */
- def parameter(value: Double): ClassInstance = {
- constructor += Left(new LiteralValue().of(Types.DOUBLE).value(value))
- this
- }
-
- /**
- * Adds a constructor parameter value of FLOAT type.
- *
- * @param value FLOAT value
- */
- def parameter(value: Float): ClassInstance = {
- constructor += Left(new LiteralValue().of(Types.FLOAT).value(value))
- this
- }
-
- /**
- * Adds a constructor parameter value of INT type.
- *
- * @param value INT value
- */
- def parameter(value: Int): ClassInstance = {
- constructor += Left(new LiteralValue().of(Types.INT).value(value))
- this
- }
-
- /**
- * Adds a constructor parameter value of VARCHAR type.
- *
- * @param value VARCHAR value
- */
- def parameter(value: String): ClassInstance = {
- constructor += Left(new LiteralValue().of(Types.STRING).value(value))
- this
- }
-
- /**
- * Adds a constructor parameter value of BIGINT type.
- *
- * @param value BIGINT value
- */
- def parameter(value: Long): ClassInstance = {
- constructor += Left(new LiteralValue().of(Types.LONG).value(value))
- this
- }
-
- /**
- * Adds a constructor parameter value of TINYINT type.
- *
- * @param value TINYINT value
- */
- def parameter(value: Byte): ClassInstance = {
- constructor += Left(new LiteralValue().of(Types.BYTE).value(value))
- this
- }
-
- /**
- * Adds a constructor parameter value of SMALLINT type.
- *
- * @param value SMALLINT value
- */
- def parameter(value: Short): ClassInstance = {
- constructor += Left(new LiteralValue().of(Types.SHORT).value(value))
- this
- }
-
- /**
- * Adds a constructor parameter value of DECIMAL type.
- *
- * @param value DECIMAL value
- */
- def parameter(value: java.math.BigDecimal): ClassInstance = {
- constructor += Left(new LiteralValue().of(Types.DECIMAL).value(value))
- this
- }
-
- /**
- * Adds a constructor parameter value of a class instance (i.e. a Java object with a public
- * constructor).
- *
- * @param classInstance description of a class instance (i.e. a Java object with a public
- * constructor).
- */
- def parameter(classInstance: ClassInstance): ClassInstance = {
- constructor += Right(classInstance)
- this
- }
-
- /**
- * Converts this descriptor into a set of properties.
- */
- override def toProperties: util.Map[String, String] = {
- val properties = new DescriptorProperties()
- addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties)
- properties.asMap()
- }
-
- /**
- * Internal method for properties conversion.
- */
- override private[flink] def addPropertiesWithPrefix(
- keyPrefix: String,
- properties: DescriptorProperties): Unit = {
- className.foreach(properties.putString(s"$keyPrefix${ClassInstanceValidator.CLASS}", _))
- var i = 0
- while (i < constructor.size) {
- constructor(i) match {
- case Left(literalValue) =>
- literalValue.addPropertiesWithPrefix(
- s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.",
- properties)
- case Right(classInstance) =>
- classInstance.addPropertiesWithPrefix(
- s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.",
- properties)
- }
- i += 1
- }
- }
-}
-
-/**
- * Descriptor for a class instance. A class instance is a Java/Scala object created from a class
- * with a public constructor (with or without parameters).
- */
-object ClassInstance {
-
- /**
- * Descriptor for a class instance. A class instance is a Java/Scala object created from a class
- * with a public constructor (with or without parameters).
- *
- * @deprecated Use `new ClassInstance()`.
- */
- @deprecated
- def apply() = new ClassInstance
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala
deleted file mode 100644
index 43da1e1..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.HierarchyDescriptorValidator.EMPTY_PREFIX
-
-import scala.collection.JavaConversions._
-
-/**
- * Validator for [[ClassInstance]].
- */
-class ClassInstanceValidator(keyPrefix: String = EMPTY_PREFIX)
- extends HierarchyDescriptorValidator(keyPrefix) {
-
- override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
- // check class name
- properties.validateString(s"$keyPrefix${ClassInstanceValidator.CLASS}", false, 1)
-
- // check constructor
- val constructorPrefix = s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}"
-
- val constructorProperties = properties.getVariableIndexedProperties(constructorPrefix, List())
- var i = 0
- while (i < constructorProperties.size()) {
- // nested class instance
- if (constructorProperties(i).containsKey(ClassInstanceValidator.CLASS)) {
- val classInstanceValidator = new ClassInstanceValidator(s"$constructorPrefix.$i.")
- classInstanceValidator.validate(properties)
- }
- // literal value
- else {
- val primitiveValueValidator = new LiteralValueValidator(s"$constructorPrefix.$i.")
- primitiveValueValidator.validate(properties)
- }
- i += 1
- }
- }
-}
-
-object ClassInstanceValidator {
- val CLASS = "class"
- val CONSTRUCTOR = "constructor"
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
deleted file mode 100644
index 59aef11..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-import java.util
-
-/**
- * Descriptor for describing a function.
- */
-class FunctionDescriptor extends Descriptor {
-
- private var from: Option[String] = None
- private var classInstance: Option[ClassInstance] = None
-
- /**
- * Creates a function from a class description.
- */
- def fromClass(classType: ClassInstance): FunctionDescriptor = {
- from = Some(FunctionDescriptorValidator.FROM_VALUE_CLASS)
- this.classInstance = Option(classType)
- this
- }
-
- /**
- * Converts this descriptor into a set of properties.
- */
- override def toProperties: util.Map[String, String] = {
- val properties = new DescriptorProperties()
- from.foreach(properties.putString(FunctionDescriptorValidator.FROM, _))
- classInstance.foreach(d => properties.putProperties(d.toProperties))
- properties.asMap()
- }
-}
-
-/**
- * Descriptor for describing a function.
- */
-object FunctionDescriptor {
-
- /**
- * Descriptor for describing a function.
- *
- * @deprecated Use `new FunctionDescriptor()`.
- */
- @deprecated
- def apply(): FunctionDescriptor = new FunctionDescriptor()
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala
deleted file mode 100644
index db8a1b5..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.FunctionDescriptorValidator.FROM
-import org.apache.flink.table.util.JavaScalaConversionUtil.toJava
-
-import scala.collection.JavaConverters._
-
-/**
- * Validator for [[FunctionDescriptor]].
- */
-class FunctionDescriptorValidator extends DescriptorValidator {
-
- override def validate(properties: DescriptorProperties): Unit = {
-
- val classValidation = (_: String) => {
- new ClassInstanceValidator().validate(properties)
- }
-
- // check for 'from'
- if (properties.containsKey(FROM)) {
- properties.validateEnum(
- FROM,
- false,
- Map(
- FunctionDescriptorValidator.FROM_VALUE_CLASS -> toJava(classValidation)
- ).asJava
- )
- } else {
- throw new ValidationException("Could not find 'from' property for function.")
- }
- }
-}
-
-object FunctionDescriptorValidator {
-
- val FROM = "from"
- val FROM_VALUE_CLASS = "class"
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala
deleted file mode 100644
index 62bd6f8..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.utils.TypeStringUtils
-import org.apache.flink.util.Preconditions
-
-/**
- * Descriptor for a literal value. A literal value consists of a type and the actual value.
- * Expression values are not allowed.
- *
- * If no type is set, the type is automatically derived from the value. Currently,
- * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
- *
- * Examples:
- * - "true", "false" -> BOOLEAN
- * - "42", "-5" -> INT
- * - "2.0", "1234.222" -> DOUBLE
- * - VARCHAR otherwise
- */
-class LiteralValue extends HierarchyDescriptor {
-
- var typeInfo: Option[String] = None
- var value: Option[Any] = None
-
- /**
- * Type information of the literal value. E.g. Types.BOOLEAN.
- *
- * @param typeInfo type information describing the value
- */
- def of(typeInfo: TypeInformation[_]): LiteralValue = {
- Preconditions.checkNotNull("Type information must not be null.")
- this.typeInfo = Option(TypeStringUtils.writeTypeInfo(typeInfo))
- this
- }
-
- /**
- * Type string of the literal value. E.g. "BOOLEAN".
- *
- * @param typeString type string describing the value
- */
- def of(typeString: String): LiteralValue = {
- this.typeInfo = Option(typeString)
- this
- }
-
- /**
- * Literal BOOLEAN value.
- *
- * @param value literal BOOLEAN value
- */
- def value(value: Boolean): LiteralValue = {
- this.value = Option(value)
- this
- }
-
- /**
- * Literal INT value.
- *
- * @param value literal INT value
- */
- def value(value: Int): LiteralValue = {
- this.value = Option(value)
- this
- }
-
- /**
- * Literal DOUBLE value.
- *
- * @param value literal DOUBLE value
- */
- def value(value: Double): LiteralValue = {
- this.value = Option(value)
- this
- }
-
- /**
- * Literal FLOAT value.
- *
- * @param value literal FLOAT value
- */
- def value(value: Float): LiteralValue = {
- this.value = Option(value)
- this
- }
-
- /**
- * Literal value either for an explicit VARCHAR type or automatically derived type.
- *
- * If no type is set, the type is automatically derived from the value. Currently,
- * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
- *
- * @param value literal value
- */
- def value(value: String): LiteralValue = {
- this.value = Option(value)
- this
- }
-
- /**
- * Literal BIGINT value.
- *
- * @param value literal BIGINT value
- */
- def value(value: Long): LiteralValue = {
- this.value = Option(value)
- this
- }
-
- /**
- * Literal TINYINT value.
- *
- * @param value literal TINYINT value
- */
- def value(value: Byte): LiteralValue = {
- this.value = Option(value)
- this
- }
-
- /**
- * Literal SMALLINT value.
- *
- * @param value literal SMALLINT value
- */
- def value(value: Short): LiteralValue = {
- this.value = Option(value)
- this
- }
-
- /**
- * Literal DECIMAL value.
- *
- * @param value literal DECIMAL value
- */
- def value(value: java.math.BigDecimal): LiteralValue = {
- this.value = Option(value)
- this
- }
-
- /**
- * Converts this descriptor into a set of properties.
- */
- override def toProperties: util.Map[String, String] = {
- val properties = new DescriptorProperties()
- addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties)
- properties.asMap()
- }
-
- /**
- * Internal method for properties conversion.
- */
- override private[flink] def addPropertiesWithPrefix(
- keyPrefix: String,
- properties: DescriptorProperties)
- : Unit = {
-
- typeInfo match {
- // explicit type
- case Some(ti) =>
- properties.putString(keyPrefix + "type", ti)
- value.foreach(v => properties.putString(keyPrefix + "value", String.valueOf(v)))
- // implicit type
- case None =>
- // do not allow values in top-level
- if (keyPrefix == HierarchyDescriptorValidator.EMPTY_PREFIX) {
- throw new ValidationException(
- "Literal values with implicit type must not exist in the top level of a hierarchy.")
- }
- value.foreach { v =>
- properties.putString(keyPrefix.substring(0, keyPrefix.length - 1), String.valueOf(v))
- }
- }
- }
-}
-
-/**
- * Descriptor for a literal value. A literal value consists of a type and the actual value.
- * Expression values are not allowed.
- *
- * If no type is set, the type is automatically derived from the value. Currently,
- * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
- *
- * Examples:
- * - "true", "false" -> BOOLEAN
- * - "42", "-5" -> INT
- * - "2.0", "1234.222" -> DOUBLE
- * - VARCHAR otherwise
- */
-object LiteralValue {
-
- /**
- * Descriptor for a literal value. A literal value consists of a type and the actual value.
- * Expression values are not allowed.
- *
- * If no type is set, the type is automatically derived from the value. Currently,
- * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
- *
- * Examples:
- * - "true", "false" -> BOOLEAN
- * - "42", "-5" -> INT
- * - "2.0", "1234.222" -> DOUBLE
- * - VARCHAR otherwise
- *
- * @deprecated Use `new Literal()`.
- */
- @deprecated
- def apply() = new LiteralValue()
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala
deleted file mode 100644
index 4e85858..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt}
-
-import org.apache.flink.api.common.typeinfo.Types
-import org.apache.flink.table.api.{TableException, ValidationException}
-
-/**
- * Validator for [[LiteralValue]].
- */
-class LiteralValueValidator(keyPrefix: String) extends HierarchyDescriptorValidator(keyPrefix) {
-
- /*
- * TODO The following types need to be supported next.
- * Types.SQL_DATE
- * Types.SQL_TIME
- * Types.SQL_TIMESTAMP
- * Types.PRIMITIVE_ARRAY
- * Types.OBJECT_ARRAY
- * Types.MAP
- * Types.MULTISET
- * null
- */
-
- override protected def validateWithPrefix(
- keyPrefix: String,
- properties: DescriptorProperties)
- : Unit = {
-
- val typeKey = s"$keyPrefix${LiteralValueValidator.TYPE}"
-
- properties.validateType(typeKey, true, false)
-
- // explicit type
- if (properties.containsKey(typeKey)) {
- val valueKey = s"$keyPrefix${LiteralValueValidator.VALUE}"
- val typeInfo = properties.getType(typeKey)
- typeInfo match {
- case Types.BIG_DEC => properties.validateBigDecimal(valueKey, false)
- case Types.BOOLEAN => properties.validateBoolean(valueKey, false)
- case Types.BYTE => properties.validateByte(valueKey, false)
- case Types.DOUBLE => properties.validateDouble(valueKey, false)
- case Types.FLOAT => properties.validateFloat(valueKey, false)
- case Types.INT => properties.validateInt(valueKey, false)
- case Types.LONG => properties.validateLong(valueKey, false)
- case Types.SHORT => properties.validateShort(valueKey, false)
- case Types.STRING => properties.validateString(valueKey, false)
- case _ => throw new TableException(s"Unsupported type '$typeInfo'.")
- }
- }
- // implicit type
- else {
- // do not allow values in top-level
- if (keyPrefix == HierarchyDescriptorValidator.EMPTY_PREFIX) {
- throw new ValidationException(
- "Literal values with implicit type must not exist in the top level of a hierarchy.")
- }
- properties.validateString(keyPrefix.substring(0, keyPrefix.length - 1), false)
- }
- }
-}
-
-object LiteralValueValidator {
- val TYPE = "type"
- val VALUE = "value"
-
- private val LITERAL_FALSE = "false"
- private val LITERAL_TRUE = "true"
-
- /**
- * Gets the value according to the type and value strings.
- *
- * @param keyPrefix the prefix of the literal type key
- * @param properties the descriptor properties
- * @return the derived value
- */
- def getValue(keyPrefix: String, properties: DescriptorProperties): Any = {
- val typeKey = s"$keyPrefix$TYPE"
- // explicit type
- if (properties.containsKey(typeKey)) {
- val valueKey = s"$keyPrefix$VALUE"
- val typeInfo = properties.getType(typeKey)
- typeInfo match {
- case Types.BIG_DEC => properties.getBigDecimal(valueKey)
- case Types.BOOLEAN => properties.getBoolean(valueKey)
- case Types.BYTE => properties.getByte(valueKey)
- case Types.DOUBLE => properties.getDouble(valueKey)
- case Types.FLOAT => properties.getFloat(valueKey)
- case Types.INT => properties.getInt(valueKey)
- case Types.LONG => properties.getLong(valueKey)
- case Types.SHORT => properties.getShort(valueKey)
- case Types.STRING => properties.getString(valueKey)
- case _ => throw new TableException(s"Unsupported type '${typeInfo.getTypeClass}'.")
- }
- }
- // implicit type
- else {
- deriveTypeStringFromValueString(
- properties.getString(keyPrefix.substring(0, keyPrefix.length - 1)))
- }
- }
-
- /**
- * Tries to derive a literal value from the given string value.
- * The derivation priority for the types are BOOLEAN, INT, DOUBLE, and VARCHAR.
- *
- * @param valueString the string formatted value
- * @return parsed value
- */
- def deriveTypeStringFromValueString(valueString: String): AnyRef = {
- if (valueString.equals(LITERAL_TRUE) || valueString.equals(LITERAL_FALSE)) {
- JBoolean.valueOf(valueString)
- } else {
- try {
- JInt.valueOf(valueString)
- } catch {
- case _: NumberFormatException =>
- try {
- JDouble.valueOf(valueString)
- } catch {
- case _: NumberFormatException =>
- valueString
- }
- }
- }
- }
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala
index 8b75e15..fdb0ccf 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala
@@ -33,25 +33,25 @@ class ClassInstanceTest extends DescriptorTestBase {
}
override def descriptors(): JList[Descriptor] = {
- val desc1 = ClassInstance()
+ val desc1 = new ClassInstance()
.of("class1")
.parameter(Types.LONG, "1")
.parameter(
- ClassInstance()
+ new ClassInstance()
.of("class2")
.parameter(
- ClassInstance()
+ new ClassInstance()
.of("class3")
.parameterString("StarryNight")
.parameter(
- ClassInstance()
+ new ClassInstance()
.of("class4"))))
.parameter(2L)
- val desc2 = ClassInstance()
+ val desc2 = new ClassInstance()
.of("class2")
- val desc3 = ClassInstance()
+ val desc3 = new ClassInstance()
.of("org.example.Function")
.parameter(42)
.parameter(2.asInstanceOf[Byte])
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala
index 1cd763f..a541850 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala
@@ -28,13 +28,13 @@ import scala.collection.JavaConverters._
class FunctionDescriptorTest extends DescriptorTestBase {
override def descriptors(): JList[Descriptor] = {
- val desc1 = FunctionDescriptor()
+ val desc1 = new FunctionDescriptor()
.fromClass(
- ClassInstance()
+ new ClassInstance()
.of("my.class")
.parameter("INT", "1")
.parameter(
- ClassInstance()
+ new ClassInstance()
.of("my.class2")
.parameterString("true")))
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala
index 271d563..8f633b1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala
@@ -43,15 +43,15 @@ class LiteralValueTest extends DescriptorTestBase {
}
override def descriptors(): JList[Descriptor] = {
- val bigDecimalDesc = LiteralValue().of(Types.DECIMAL).value(new JBigDecimal(1))
- val booleanDesc = LiteralValue().of(Types.BOOLEAN).value(false)
- val byteDesc = LiteralValue().of(Types.BYTE).value(4.asInstanceOf[Byte])
- val doubleDesc = LiteralValue().of(Types.DOUBLE).value(7.0)
- val floatDesc = LiteralValue().of(Types.FLOAT).value(8.0f)
- val intDesc = LiteralValue().of(Types.INT).value(9)
- val longDesc = LiteralValue().of(Types.LONG).value(10L)
- val shortDesc = LiteralValue().of(Types.SHORT).value(11.asInstanceOf[Short])
- val stringDesc = LiteralValue().of(Types.STRING).value("12")
+ val bigDecimalDesc = new LiteralValue().of(Types.DECIMAL).value(new JBigDecimal(1))
+ val booleanDesc = new LiteralValue().of(Types.BOOLEAN).value(false)
+ val byteDesc = new LiteralValue().of(Types.BYTE).value(4.asInstanceOf[Byte])
+ val doubleDesc = new LiteralValue().of(Types.DOUBLE).value(7.0)
+ val floatDesc = new LiteralValue().of(Types.FLOAT).value(8.0f)
+ val intDesc = new LiteralValue().of(Types.INT).value(9)
+ val longDesc = new LiteralValue().of(Types.LONG).value(10L)
+ val shortDesc = new LiteralValue().of(Types.SHORT).value(11.asInstanceOf[Short])
+ val stringDesc = new LiteralValue().of(Types.STRING).value("12")
// for tests with implicit type see ClassInstanceTest because literal value are not
// supported in the top level of a hierarchy
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
index bcef012..2ffc423 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
@@ -31,8 +31,8 @@ class FunctionServiceTest {
@Test(expected = classOf[ValidationException])
def testWrongArgsFunctionCreation(): Unit = {
- val descriptor = FunctionDescriptor()
- .fromClass(ClassInstance()
+ val descriptor = new FunctionDescriptor()
+ .fromClass(new ClassInstance()
.of(classOf[NoArgClass].getName)
.parameterString("12"))
@@ -41,24 +41,24 @@ class FunctionServiceTest {
@Test(expected = classOf[ValidationException])
def testPrivateFunctionCreation(): Unit = {
- val descriptor = FunctionDescriptor()
- .fromClass(ClassInstance().of(classOf[PrivateClass].getName))
+ val descriptor = new FunctionDescriptor()
+ .fromClass(new ClassInstance().of(classOf[PrivateClass].getName))
FunctionService.createFunction(descriptor)
}
@Test(expected = classOf[ValidationException])
def testInvalidClassFunctionCreation(): Unit = {
- val descriptor = FunctionDescriptor()
- .fromClass(ClassInstance().of("this.class.does.not.exist"))
+ val descriptor = new FunctionDescriptor()
+ .fromClass(new ClassInstance().of("this.class.does.not.exist"))
FunctionService.createFunction(descriptor)
}
@Test(expected = classOf[ValidationException])
def testNotFunctionClassFunctionCreation(): Unit = {
- val descriptor = FunctionDescriptor()
- .fromClass(ClassInstance()
+ val descriptor = new FunctionDescriptor()
+ .fromClass(new ClassInstance()
.of(classOf[java.lang.String].getName)
.parameterString("hello"))
@@ -67,17 +67,17 @@ class FunctionServiceTest {
@Test
def testNoArgFunctionCreation(): Unit = {
- val descriptor = FunctionDescriptor()
- .fromClass(ClassInstance().of(classOf[NoArgClass].getName))
+ val descriptor = new FunctionDescriptor()
+ .fromClass(new ClassInstance().of(classOf[NoArgClass].getName))
assertEquals(classOf[NoArgClass], FunctionService.createFunction(descriptor).getClass)
}
@Test
def testOneArgFunctionCreation(): Unit = {
- val descriptor = FunctionDescriptor()
+ val descriptor = new FunctionDescriptor()
.fromClass(
- ClassInstance()
+ new ClassInstance()
.of(classOf[OneArgClass].getName)
.parameterString("false"))
@@ -89,12 +89,12 @@ class FunctionServiceTest {
@Test
def testMultiArgFunctionCreation(): Unit = {
- val descriptor = FunctionDescriptor()
+ val descriptor = new FunctionDescriptor()
.fromClass(
- ClassInstance()
+ new ClassInstance()
.of(classOf[MultiArgClass].getName)
.parameter(new java.math.BigDecimal("12.0003"))
- .parameter(ClassInstance()
+ .parameter(new ClassInstance()
.of(classOf[java.math.BigInteger].getName)
.parameter("111111111111111111111111111111111")))