You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/23 15:41:20 UTC

[flink] branch master updated: [FLINK-13266][table] Port function-related descriptors to flink-table-common

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 24f1dce  [FLINK-13266][table] Port function-related descriptors to flink-table-common
24f1dce is described below

commit 24f1dce1ceca35ef0177fa16648fd40cbcb52ded
Author: godfreyhe <go...@163.com>
AuthorDate: Thu Jul 18 19:26:36 2019 +0800

    [FLINK-13266][table] Port function-related descriptors to flink-table-common
    
    FileSystem/OldCsv/RowtimeValidator/SchemaValidator will be ported in other commits
---
 .../flink/table/descriptors/ClassInstance.java     | 232 +++++++++++++++++++
 .../table/descriptors/ClassInstanceValidator.java  |  70 ++++++
 .../table/descriptors/FunctionDescriptor.java      |  56 +++++
 .../descriptors/FunctionDescriptorValidator.java   |  49 +++++
 .../table/descriptors/HierarchyDescriptor.java}    |  27 ++-
 .../descriptors/HierarchyDescriptorValidator.java} |  40 ++--
 .../flink/table/descriptors/LiteralValue.java      | 187 ++++++++++++++++
 .../table/descriptors/LiteralValueValidator.java   | 162 ++++++++++++++
 .../flink/table/descriptors/ClassInstance.scala    | 245 ---------------------
 .../table/descriptors/ClassInstanceValidator.scala |  59 -----
 .../table/descriptors/FunctionDescriptor.scala     |  62 ------
 .../descriptors/FunctionDescriptorValidator.scala  |  57 -----
 .../flink/table/descriptors/LiteralValue.scala     | 228 -------------------
 .../table/descriptors/LiteralValueValidator.scala  | 145 ------------
 .../table/descriptors/ClassInstanceTest.scala      |  12 +-
 .../table/descriptors/FunctionDescriptorTest.scala |   6 +-
 .../flink/table/descriptors/LiteralValueTest.scala |  18 +-
 .../table/functions/FunctionServiceTest.scala      |  30 +--
 18 files changed, 828 insertions(+), 857 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstance.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstance.java
new file mode 100644
index 0000000..573157d
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstance.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Either;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.types.Either.Left;
+import static org.apache.flink.types.Either.Right;
+
+/**
+ * Descriptor for a class instance. A class instance is a Java/Scala object created from a class
+ * with a public constructor (with or without parameters).
+ */
+@PublicEvolving
+public class ClassInstance extends HierarchyDescriptor {
+
+	private String className;
+	// the parameter is either a literal value or the instance of a class
+	private List<Either<LiteralValue, ClassInstance>> constructor = new ArrayList<>();
+
+	/**
+	 * Sets the fully qualified class name for creating an instance.
+	 *
+	 * <p>E.g. org.example.MyClass or org.example.MyClass$StaticInnerClass
+	 *
+	 * @param className fully qualified class name
+	 */
+	public ClassInstance of(String className) {
+		this.className = className;
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of literal type. The type is automatically derived from
+	 * the value. Currently, this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR. Expression
+	 * values are not allowed.
+	 *
+	 * <p>Examples:
+	 * - "true", "false" -> BOOLEAN
+	 * - "42", "-5" -> INT
+	 * - "2.0", "1234.222" -> DOUBLE
+	 * - VARCHAR otherwise
+	 *
+	 * <p>For other types and explicit type declaration use {@link #parameter(String, String)} or
+	 * {@link #parameter(TypeInformation, String)}.
+	 */
+	public ClassInstance parameterString(String valueString) {
+		constructor.add(Left(new LiteralValue().value(valueString)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of literal type. The type is explicitly defined using a
+	 * type string such as VARCHAR, FLOAT, BOOLEAN, INT, BIGINT, etc. The value is parsed
+	 * accordingly. Expression values are not allowed.
+	 *
+	 * @param typeString the type string that define how to parse the given value string
+	 * @param valueString the literal value to be parsed
+	 */
+	public ClassInstance parameter(String typeString, String valueString) {
+		constructor.add(Left(new LiteralValue().of(typeString).value(valueString)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of literal type. The type is explicitly defined using
+	 * type information. The value is parsed accordingly. Expression values are not allowed.
+	 *
+	 * @param typeInfo the type that define how to parse the given value string
+	 * @param valueString the literal value to be parsed
+	 */
+	public ClassInstance parameter(TypeInformation<?> typeInfo, String valueString) {
+		constructor.add(Left(new LiteralValue().of(typeInfo).value(valueString)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of BOOLEAN type.
+	 *
+	 * @param value BOOLEAN value
+	 */
+	public ClassInstance parameter(boolean value) {
+		constructor.add(Left(new LiteralValue().of(Types.BOOLEAN).value(value)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of DOUBLE type.
+	 *
+	 * @param value DOUBLE value
+	 */
+	public ClassInstance parameter(double value) {
+		constructor.add(Left(new LiteralValue().of(Types.DOUBLE).value(value)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of FLOAT type.
+	 *
+	 * @param value FLOAT value
+	 */
+	public ClassInstance parameter(float value) {
+		constructor.add(Left(new LiteralValue().of(Types.FLOAT).value(value)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of INT type.
+	 *
+	 * @param value INT value
+	 */
+	public ClassInstance parameter(int value) {
+		constructor.add(Left(new LiteralValue().of(Types.INT).value(value)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of VARCHAR type.
+	 *
+	 * @param value VARCHAR value
+	 */
+	public ClassInstance parameter(String value) {
+		constructor.add(Left(new LiteralValue().of(Types.STRING).value(value)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of BIGINT type.
+	 *
+	 * @param value BIGINT value
+	 */
+	public ClassInstance parameter(long value) {
+		constructor.add(Left(new LiteralValue().of(Types.LONG).value(value)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of TINYINT type.
+	 *
+	 * @param value TINYINT value
+	 */
+	public ClassInstance parameter(byte value) {
+		constructor.add(Left(new LiteralValue().of(Types.BYTE).value(value)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of SMALLINT type.
+	 *
+	 * @param value SMALLINT value
+	 */
+	public ClassInstance parameter(short value) {
+		constructor.add(Left(new LiteralValue().of(Types.SHORT).value(value)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of DECIMAL type.
+	 *
+	 * @param value DECIMAL value
+	 */
+	public ClassInstance parameter(BigDecimal value) {
+		constructor.add(Left(new LiteralValue().of(Types.BIG_DEC).value(value)));
+		return this;
+	}
+
+	/**
+	 * Adds a constructor parameter value of a class instance (i.e. a Java object with a public
+	 * constructor).
+	 *
+	 * @param classInstance description of a class instance (i.e. a Java object with a public
+	 * constructor).
+	 */
+	public ClassInstance parameter(ClassInstance classInstance) {
+		constructor.add(Right(classInstance));
+		return this;
+	}
+
+	/**
+	 * Converts this descriptor into a set of properties.
+	 */
+	@Override
+	public Map<String, String> toProperties() {
+		DescriptorProperties properties = new DescriptorProperties();
+		addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties);
+		return properties.asMap();
+	}
+
+	/**
+	 * Internal method for properties conversion.
+	 */
+	@Override
+	public void addPropertiesWithPrefix(String keyPrefix, DescriptorProperties properties) {
+		if (className != null) {
+			properties.putString(keyPrefix + ClassInstanceValidator.CLASS, className);
+		}
+		for (int i = 0; i < constructor.size(); ++i) {
+			Either<LiteralValue, ClassInstance> either = constructor.get(i);
+			String keyPrefixWithIdx = keyPrefix + ClassInstanceValidator.CONSTRUCTOR + "." + i + ".";
+			if (either.isLeft()) {
+				either.left().addPropertiesWithPrefix(keyPrefixWithIdx, properties);
+			} else {
+				either.right().addPropertiesWithPrefix(keyPrefixWithIdx, properties);
+			}
+		}
+	}
+
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstanceValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstanceValidator.java
new file mode 100644
index 0000000..a5abebb
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ClassInstanceValidator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Validator for {@link ClassInstance}.
+ */
+@Internal
+public class ClassInstanceValidator extends HierarchyDescriptorValidator {
+
+	public static final String CLASS = "class";
+	public static final String CONSTRUCTOR = "constructor";
+
+	/**
+	 * @param keyPrefix prefix to be added to every property before validation
+	 */
+	public ClassInstanceValidator(String keyPrefix) {
+		super(keyPrefix);
+	}
+
+	public ClassInstanceValidator() {
+		this(EMPTY_PREFIX);
+	}
+
+	@Override
+	protected void validateWithPrefix(String keyPrefix, DescriptorProperties properties) {
+		// check class name
+		properties.validateString(keyPrefix + CLASS, false, 1);
+
+		// check constructor
+		String constructorPrefix = keyPrefix + CONSTRUCTOR;
+
+		List<Map<String, String>> constructorProperties =
+				properties.getVariableIndexedProperties(constructorPrefix, new ArrayList<>());
+		for (int i = 0; i < constructorProperties.size(); ++i) {
+			String keyPrefixWithIdx = constructorPrefix + "." + i + ".";
+			if (constructorProperties.get(i).containsKey(ClassInstanceValidator.CLASS)) {
+				ClassInstanceValidator classInstanceValidator = new ClassInstanceValidator(keyPrefixWithIdx);
+				classInstanceValidator.validate(properties);
+			}
+			// literal value
+			else {
+				LiteralValueValidator primitiveValueValidator = new LiteralValueValidator(keyPrefixWithIdx);
+				primitiveValueValidator.validate(properties);
+			}
+		}
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptor.java
new file mode 100644
index 0000000..72e1068
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/**
+ * Descriptor for describing a function.
+ */
+@PublicEvolving
+public class FunctionDescriptor implements Descriptor {
+	private String from;
+	private ClassInstance classInstance;
+
+	/**
+	 * Creates a function from a class description.
+	 */
+	public FunctionDescriptor fromClass(ClassInstance classType) {
+		from = FunctionDescriptorValidator.FROM_VALUE_CLASS;
+		this.classInstance = classType;
+		return this;
+	}
+
+	/**
+	 * Converts this descriptor into a set of properties.
+	 */
+	@Override
+	public Map<String, String> toProperties() {
+		DescriptorProperties properties = new DescriptorProperties();
+		if (from != null) {
+			properties.putString(FunctionDescriptorValidator.FROM, from);
+		}
+		if (classInstance != null) {
+			properties.putProperties(classInstance.toProperties());
+		}
+		return properties.asMap();
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptorValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptorValidator.java
new file mode 100644
index 0000000..059be19
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FunctionDescriptorValidator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Validator for {@link FunctionDescriptor}.
+ */
+@Internal
+public class FunctionDescriptorValidator implements DescriptorValidator {
+
+	public static final String FROM = "from";
+	public static final String FROM_VALUE_CLASS = "class";
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		Map<String, Consumer<String>> enumValidation = new HashMap<>();
+		enumValidation.put(FROM_VALUE_CLASS, s -> new ClassInstanceValidator().validate(properties));
+
+		// check for 'from'
+		if (properties.containsKey(FROM)) {
+			properties.validateEnum(FROM, false, enumValidation);
+		} else {
+			throw new ValidationException("Could not find 'from' property for function.");
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptor.java
similarity index 61%
rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptor.java
index b958b1c..a8790d7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptor.java
@@ -16,20 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
 
 /**
-  * A descriptor that may exist in an arbitrary level (be recursively included by other
-  * descriptors).
-  */
-abstract class HierarchyDescriptor extends Descriptor {
+ * A descriptor that may exist in an arbitrary level (be recursively included by other
+ * descriptors).
+ */
+@PublicEvolving
+public abstract class HierarchyDescriptor implements Descriptor {
 
-  /**
-    * Internal method for properties conversion. All the property keys will be prefixed with the
-    * given key prefix.
-    */
-  private[flink] def addPropertiesWithPrefix(
-    keyPrefix: String,
-    properties: DescriptorProperties): Unit
+	/**
+	 * Internal method for properties conversion. All the property keys will be prefixed with the
+	 * given key prefix.
+	 */
+	public abstract void addPropertiesWithPrefix(
+			String keyPrefix,
+			DescriptorProperties properties);
 
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.java
similarity index 52%
rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.java
index d7bf573..c52cebf 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.java
@@ -16,26 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
 
 /**
-  * Validator for a [[HierarchyDescriptor]].
-  *
-  * @param keyPrefix prefix to be added to every property before validation
-  */
-abstract class HierarchyDescriptorValidator(keyPrefix: String) extends DescriptorValidator{
+ * Validator for a {@link HierarchyDescriptor}.
+ */
+@Internal
+public abstract class HierarchyDescriptorValidator implements DescriptorValidator {
 
-  final def validate(properties: DescriptorProperties): Unit = {
-    validateWithPrefix(keyPrefix, properties)
-  }
+	public static final String EMPTY_PREFIX = "";
 
-  /**
-    * Performs validation with a prefix for the keys.
-    */
-  protected def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit
+	private String keyPrefix;
 
-}
+	/**
+	 * @param keyPrefix prefix to be added to every property before validation
+	 */
+	public HierarchyDescriptorValidator(String keyPrefix) {
+		this.keyPrefix = keyPrefix;
+	}
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		validateWithPrefix(keyPrefix, properties);
+	}
 
-object HierarchyDescriptorValidator {
-  val EMPTY_PREFIX = ""
+	/**
+	 * Performs validation with a prefix for the keys.
+	 */
+	protected abstract void validateWithPrefix(String keyPrefix, DescriptorProperties properties);
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValue.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValue.java
new file mode 100644
index 0000000..8801169
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValue.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.utils.TypeStringUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+/**
+ * Descriptor for a literal value. A literal value consists of a type and the actual value.
+ * Expression values are not allowed.
+ *
+ * <p>If no type is set, the type is automatically derived from the value. Currently,
+ * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
+ *
+ * <p>Examples:
+ * - "true", "false" -> BOOLEAN
+ * - "42", "-5" -> INT
+ * - "2.0", "1234.222" -> DOUBLE
+ * - VARCHAR otherwise
+ */
+@PublicEvolving
+public class LiteralValue extends HierarchyDescriptor {
+	private String typeInfo;
+	private Object value;
+
+	/**
+	 * Type information of the literal value. E.g. Types.BOOLEAN.
+	 *
+	 * @param typeInfo type information describing the value
+	 */
+	public LiteralValue of(TypeInformation<?> typeInfo) {
+		Preconditions.checkNotNull(typeInfo, "Type information must not be null.");
+		this.typeInfo = TypeStringUtils.writeTypeInfo(typeInfo);
+		return this;
+	}
+
+	/**
+	 * Type string of the literal value. E.g. "BOOLEAN".
+	 *
+	 * @param typeString type string describing the value
+	 */
+	public LiteralValue of(String typeString) {
+		this.typeInfo = typeString;
+		return this;
+	}
+
+	/**
+	 * Literal BOOLEAN value.
+	 *
+	 * @param value literal BOOLEAN value
+	 */
+	public LiteralValue value(boolean value) {
+		this.value = value;
+		return this;
+	}
+
+	/**
+	 * Literal INT value.
+	 *
+	 * @param value literal INT value
+	 */
+	public LiteralValue value(int value) {
+		this.value = value;
+		return this;
+	}
+
+	/**
+	 * Literal DOUBLE value.
+	 *
+	 * @param value literal DOUBLE value
+	 */
+	public LiteralValue value(double value) {
+		this.value = value;
+		return this;
+	}
+
+	/**
+	 * Literal FLOAT value.
+	 *
+	 * @param value literal FLOAT value
+	 */
+	public LiteralValue value(float value) {
+		this.value = value;
+		return this;
+	}
+
+	/**
+	 * Literal value either for an explicit VARCHAR type or automatically derived type.
+	 *
+	 * <p>If no type is set, the type is automatically derived from the value. Currently,
+	 * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
+	 *
+	 * @param value literal value
+	 */
+	public LiteralValue value(String value) {
+		this.value = value;
+		return this;
+	}
+
+	/**
+	 * Literal BIGINT value.
+	 *
+	 * @param value literal BIGINT value
+	 */
+	public LiteralValue value(long value) {
+		this.value = value;
+		return this;
+	}
+
+	/**
+	 * Literal TINYINT value.
+	 *
+	 * @param value literal TINYINT value
+	 */
+	public LiteralValue value(byte value) {
+		this.value = value;
+		return this;
+	}
+
+	/**
+	 * Literal SMALLINT value.
+	 *
+	 * @param value literal SMALLINT value
+	 */
+	public LiteralValue value(short value) {
+		this.value = value;
+		return this;
+	}
+
+	/**
+	 * Literal DECIMAL value.
+	 *
+	 * @param value literal DECIMAL value
+	 */
+	public LiteralValue value(BigDecimal value) {
+		this.value = value;
+		return this;
+	}
+
+	@Override
+	public Map<String, String> toProperties() {
+		DescriptorProperties properties = new DescriptorProperties();
+		addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties);
+		return properties.asMap();
+	}
+
+	@Override
+	public void addPropertiesWithPrefix(String keyPrefix, DescriptorProperties properties) {
+		if (typeInfo != null) {
+			properties.putString(keyPrefix + "type", typeInfo);
+			if (value != null) {
+				properties.putString(keyPrefix + "value", String.valueOf(value));
+			}
+		} else {
+			// do not allow values in top-level
+			if (keyPrefix.equals(HierarchyDescriptorValidator.EMPTY_PREFIX)) {
+				throw new ValidationException(
+						"Literal values with implicit type must not exist in the top level of a hierarchy.");
+			}
+			if (value != null) {
+				properties.putString(keyPrefix.substring(0, keyPrefix.length() - 1), String.valueOf(value));
+			}
+		}
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValueValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValueValidator.java
new file mode 100644
index 0000000..ffd6503
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/LiteralValueValidator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+
+/**
+ * Validator for {@link LiteralValue}.
+ */
+@Internal
+public class LiteralValueValidator extends HierarchyDescriptorValidator {
+
+	public static final String TYPE = "type";
+	public static final String VALUE = "value";
+
+	/*
+	 * TODO The following types need to be supported next.
+	 * Types.SQL_DATE
+	 * Types.SQL_TIME
+	 * Types.SQL_TIMESTAMP
+	 * Types.PRIMITIVE_ARRAY
+	 * Types.OBJECT_ARRAY
+	 * Types.MAP
+	 * Types.MULTISET
+	 * null
+	 */
+
+	/**
+	 * Gets the value according to the type and value strings.
+	 *
+	 * @param keyPrefix the prefix of the literal type key
+	 * @param properties the descriptor properties
+	 * @return the derived value
+	 */
+	public static Object getValue(String keyPrefix, DescriptorProperties properties) {
+		String typeKey = keyPrefix + TYPE;
+		// explicit type
+		if (properties.containsKey(typeKey)) {
+			String valueKey = keyPrefix + VALUE;
+			TypeInformation<?> typeInfo = properties.getType(typeKey);
+
+			if (typeInfo.equals(Types.BIG_DEC)) {
+				return properties.getBigDecimal(valueKey);
+			} else if (typeInfo.equals(Types.BOOLEAN)) {
+				return properties.getBoolean(valueKey);
+			} else if (typeInfo.equals(Types.BYTE)) {
+				return properties.getByte(valueKey);
+			} else if (typeInfo.equals(Types.DOUBLE)) {
+				return properties.getDouble(valueKey);
+			} else if (typeInfo.equals(Types.FLOAT)) {
+				return properties.getFloat(valueKey);
+			} else if (typeInfo.equals(Types.INT)) {
+				return properties.getInt(valueKey);
+			} else if (typeInfo.equals(Types.LONG)) {
+				return properties.getLong(valueKey);
+			} else if (typeInfo.equals(Types.SHORT)) {
+				return properties.getShort(valueKey);
+			} else if (typeInfo.equals(Types.STRING)) {
+				return properties.getString(valueKey);
+			} else {
+				throw new TableException("Unsupported type '" + typeInfo.getTypeClass() + "'.");
+			}
+		}
+		// implicit type
+		else {
+			return deriveTypeStringFromValueString(
+					properties.getString(keyPrefix.substring(0, keyPrefix.length() - 1)));
+		}
+	}
+
+	/**
+	 * Tries to derive a literal value from the given string value.
+	 * The derivation priority for the types are BOOLEAN, INT, DOUBLE, and VARCHAR.
+	 *
+	 * @param valueString the string formatted value
+	 * @return parsed value
+	 */
+	private static Object deriveTypeStringFromValueString(String valueString) {
+		if (valueString.equals("true") || valueString.equals("false")) {
+			return Boolean.valueOf(valueString);
+		} else {
+			try {
+				return Integer.valueOf(valueString);
+			} catch (NumberFormatException e1) {
+				try {
+					return Double.valueOf(valueString);
+				} catch (NumberFormatException e2) {
+					return valueString;
+				}
+			}
+		}
+	}
+
+	/**
+	 * @param keyPrefix prefix to be added to every property before validation
+	 */
+	public LiteralValueValidator(String keyPrefix) {
+		super(keyPrefix);
+	}
+
+	@Override
+	protected void validateWithPrefix(String keyPrefix, DescriptorProperties properties) {
+		String typeKey = keyPrefix + TYPE;
+		properties.validateType(typeKey, true, false);
+
+		// explicit type
+		if (properties.containsKey(typeKey)) {
+			String valueKey = keyPrefix + VALUE;
+			TypeInformation<?> typeInfo = properties.getType(typeKey);
+			if (typeInfo.equals(Types.BIG_DEC)) {
+				properties.validateBigDecimal(valueKey, false);
+			} else if (typeInfo.equals(Types.BOOLEAN)) {
+				properties.validateBoolean(valueKey, false);
+			} else if (typeInfo.equals(Types.BYTE)) {
+				properties.validateByte(valueKey, false);
+			} else if (typeInfo.equals(Types.DOUBLE)) {
+				properties.validateDouble(valueKey, false);
+			} else if (typeInfo.equals(Types.FLOAT)) {
+				properties.validateFloat(valueKey, false);
+			} else if (typeInfo.equals(Types.INT)) {
+				properties.validateInt(valueKey, false);
+			} else if (typeInfo.equals(Types.LONG)) {
+				properties.validateLong(valueKey, false);
+			} else if (typeInfo.equals(Types.SHORT)) {
+				properties.validateShort(valueKey, false);
+			} else if (typeInfo.equals(Types.STRING)) {
+				properties.validateString(valueKey, false);
+			} else {
+				throw new TableException("Unsupported type '" + typeInfo + "'.");
+			}
+		}
+		// implicit type
+		else {
+			// do not allow values in top-level
+			if (keyPrefix.equals(HierarchyDescriptorValidator.EMPTY_PREFIX)) {
+				throw new ValidationException(
+						"Literal values with implicit type must not exist in the top level of a hierarchy.");
+			}
+			properties.validateString(keyPrefix.substring(0, keyPrefix.length() - 1), false);
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala
deleted file mode 100644
index 34ed6dd..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstance.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.Types
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
-  * Descriptor for a class instance. A class instance is a Java/Scala object created from a class
-  * with a public constructor (with or without parameters).
-  */
-class ClassInstance extends HierarchyDescriptor {
-
-  private var className: Option[String] = None
-
-  // the parameter is either a literal value or the instance of a class
-  private val constructor: ArrayBuffer[Either[LiteralValue, ClassInstance]] = ArrayBuffer()
-
-  /**
-    * Sets the fully qualified class name for creating an instance.
-    *
-    * E.g. org.example.MyClass or org.example.MyClass$StaticInnerClass
-    *
-    * @param className fully qualified class name
-    */
-  def of(className: String): ClassInstance = {
-    this.className = Option(className)
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of literal type. The type is automatically derived from
-    * the value. Currently, this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR. Expression
-    * values are not allowed.
-    *
-    * Examples:
-    *   - "true", "false" -> BOOLEAN
-    *   - "42", "-5" -> INT
-    *   - "2.0", "1234.222" -> DOUBLE
-    *   - VARCHAR otherwise
-    *
-    * For other types and explicit type declaration use [[parameter(String, String)]] or
-    * [[parameter(TypeInformation, String)]].
-    *
-    */
-  def parameterString(valueString: String): ClassInstance = {
-    constructor += Left(new LiteralValue().value(valueString))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of literal type. The type is explicitly defined using a
-    * type string such as VARCHAR, FLOAT, BOOLEAN, INT, BIGINT, etc. The value is parsed
-    * accordingly. Expression values are not allowed.
-    *
-    * @param typeString the type string that define how to parse the given value string
-    * @param valueString the literal value to be parsed
-    */
-  def parameter(typeString: String, valueString: String): ClassInstance = {
-    constructor += Left(new LiteralValue().of(typeString).value(valueString))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of literal type. The type is explicitly defined using
-    * type information. The value is parsed accordingly. Expression values are not allowed.
-    *
-    * @param typeInfo the type that define how to parse the given value string
-    * @param valueString the literal value to be parsed
-    */
-  def parameter(typeInfo: TypeInformation[_], valueString: String): ClassInstance = {
-    constructor += Left(new LiteralValue().of(typeInfo).value(valueString))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of BOOLEAN type.
-    *
-    * @param value BOOLEAN value
-    */
-  def parameter(value: Boolean): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.BOOLEAN).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of DOUBLE type.
-    *
-    * @param value DOUBLE value
-    */
-  def parameter(value: Double): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.DOUBLE).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of FLOAT type.
-    *
-    * @param value FLOAT value
-    */
-  def parameter(value: Float): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.FLOAT).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of INT type.
-    *
-    * @param value INT value
-    */
-  def parameter(value: Int): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.INT).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of VARCHAR type.
-    *
-    * @param value VARCHAR value
-    */
-  def parameter(value: String): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.STRING).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of BIGINT type.
-    *
-    * @param value BIGINT value
-    */
-  def parameter(value: Long): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.LONG).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of TINYINT type.
-    *
-    * @param value TINYINT value
-    */
-  def parameter(value: Byte): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.BYTE).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of SMALLINT type.
-    *
-    * @param value SMALLINT value
-    */
-  def parameter(value: Short): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.SHORT).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of DECIMAL type.
-    *
-    * @param value DECIMAL value
-    */
-  def parameter(value: java.math.BigDecimal): ClassInstance = {
-    constructor += Left(new LiteralValue().of(Types.DECIMAL).value(value))
-    this
-  }
-
-  /**
-    * Adds a constructor parameter value of a class instance (i.e. a Java object with a public
-    * constructor).
-    *
-    * @param classInstance description of a class instance (i.e. a Java object with a public
-    *                      constructor).
-    */
-  def parameter(classInstance: ClassInstance): ClassInstance = {
-    constructor += Right(classInstance)
-    this
-  }
-
-  /**
-    * Converts this descriptor into a set of properties.
-    */
-  override def toProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-    addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties)
-    properties.asMap()
-  }
-
-  /**
-    * Internal method for properties conversion.
-    */
-  override private[flink] def addPropertiesWithPrefix(
-      keyPrefix: String,
-      properties: DescriptorProperties): Unit = {
-    className.foreach(properties.putString(s"$keyPrefix${ClassInstanceValidator.CLASS}", _))
-    var i = 0
-    while (i < constructor.size) {
-      constructor(i) match {
-        case Left(literalValue) =>
-          literalValue.addPropertiesWithPrefix(
-            s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.",
-            properties)
-        case Right(classInstance) =>
-          classInstance.addPropertiesWithPrefix(
-            s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}.$i.",
-            properties)
-      }
-      i += 1
-    }
-  }
-}
-
-/**
-  * Descriptor for a class instance. A class instance is a Java/Scala object created from a class
-  * with a public constructor (with or without parameters).
-  */
-object ClassInstance {
-
-  /**
-    * Descriptor for a class instance. A class instance is a Java/Scala object created from a class
-    * with a public constructor (with or without parameters).
-    *
-    * @deprecated Use `new ClassInstance()`.
-    */
-  @deprecated
-  def apply() = new ClassInstance
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala
deleted file mode 100644
index 43da1e1..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/ClassInstanceValidator.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.HierarchyDescriptorValidator.EMPTY_PREFIX
-
-import scala.collection.JavaConversions._
-
-/**
-  * Validator for [[ClassInstance]].
-  */
-class ClassInstanceValidator(keyPrefix: String = EMPTY_PREFIX)
-  extends HierarchyDescriptorValidator(keyPrefix) {
-
-  override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
-    // check class name
-    properties.validateString(s"$keyPrefix${ClassInstanceValidator.CLASS}", false, 1)
-
-    // check constructor
-    val constructorPrefix = s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}"
-
-    val constructorProperties = properties.getVariableIndexedProperties(constructorPrefix, List())
-    var i = 0
-    while (i < constructorProperties.size()) {
-      // nested class instance
-      if (constructorProperties(i).containsKey(ClassInstanceValidator.CLASS)) {
-        val classInstanceValidator = new ClassInstanceValidator(s"$constructorPrefix.$i.")
-        classInstanceValidator.validate(properties)
-      }
-      // literal value
-      else {
-        val primitiveValueValidator = new LiteralValueValidator(s"$constructorPrefix.$i.")
-        primitiveValueValidator.validate(properties)
-      }
-      i += 1
-    }
-  }
-}
-
-object ClassInstanceValidator {
-  val CLASS = "class"
-  val CONSTRUCTOR = "constructor"
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
deleted file mode 100644
index 59aef11..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-import java.util
-
-/**
-  * Descriptor for describing a function.
-  */
-class FunctionDescriptor extends Descriptor {
-
-  private var from: Option[String] = None
-  private var classInstance: Option[ClassInstance] = None
-
-  /**
-    * Creates a function from a class description.
-    */
-  def fromClass(classType: ClassInstance): FunctionDescriptor = {
-    from = Some(FunctionDescriptorValidator.FROM_VALUE_CLASS)
-    this.classInstance = Option(classType)
-    this
-  }
-
-  /**
-    * Converts this descriptor into a set of properties.
-    */
-  override def toProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-    from.foreach(properties.putString(FunctionDescriptorValidator.FROM, _))
-    classInstance.foreach(d => properties.putProperties(d.toProperties))
-    properties.asMap()
-  }
-}
-
-/**
-  * Descriptor for describing a function.
-  */
-object FunctionDescriptor {
-
-  /**
-    * Descriptor for describing a function.
-    *
-    * @deprecated Use `new FunctionDescriptor()`.
-    */
-  @deprecated
-  def apply(): FunctionDescriptor = new FunctionDescriptor()
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala
deleted file mode 100644
index db8a1b5..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptorValidator.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.FunctionDescriptorValidator.FROM
-import org.apache.flink.table.util.JavaScalaConversionUtil.toJava
-
-import scala.collection.JavaConverters._
-
-/**
-  * Validator for [[FunctionDescriptor]].
-  */
-class FunctionDescriptorValidator extends DescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-
-    val classValidation = (_: String) => {
-      new ClassInstanceValidator().validate(properties)
-    }
-
-    // check for 'from'
-    if (properties.containsKey(FROM)) {
-      properties.validateEnum(
-        FROM,
-        false,
-        Map(
-          FunctionDescriptorValidator.FROM_VALUE_CLASS -> toJava(classValidation)
-        ).asJava
-      )
-    } else {
-      throw new ValidationException("Could not find 'from' property for function.")
-    }
-  }
-}
-
-object FunctionDescriptorValidator {
-
-  val FROM = "from"
-  val FROM_VALUE_CLASS = "class"
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala
deleted file mode 100644
index 62bd6f8..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValue.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.utils.TypeStringUtils
-import org.apache.flink.util.Preconditions
-
-/**
-  * Descriptor for a literal value. A literal value consists of a type and the actual value.
-  * Expression values are not allowed.
-  *
-  * If no type is set, the type is automatically derived from the value. Currently,
-  * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
-  *
-  * Examples:
-  *   - "true", "false" -> BOOLEAN
-  *   - "42", "-5" -> INT
-  *   - "2.0", "1234.222" -> DOUBLE
-  *   - VARCHAR otherwise
-  */
-class LiteralValue extends HierarchyDescriptor {
-
-  var typeInfo: Option[String] = None
-  var value: Option[Any] = None
-
-  /**
-    * Type information of the literal value. E.g. Types.BOOLEAN.
-    *
-    * @param typeInfo type information describing the value
-    */
-  def of(typeInfo: TypeInformation[_]): LiteralValue = {
-    Preconditions.checkNotNull("Type information must not be null.")
-    this.typeInfo = Option(TypeStringUtils.writeTypeInfo(typeInfo))
-    this
-  }
-
-  /**
-    * Type string of the literal value. E.g. "BOOLEAN".
-    *
-    * @param typeString type string describing the value
-    */
-  def of(typeString: String): LiteralValue = {
-    this.typeInfo = Option(typeString)
-    this
-  }
-
-  /**
-    * Literal BOOLEAN value.
-    *
-    * @param value literal BOOLEAN value
-    */
-  def value(value: Boolean): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal INT value.
-    *
-    * @param value literal INT value
-    */
-  def value(value: Int): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal DOUBLE value.
-    *
-    * @param value literal DOUBLE value
-    */
-  def value(value: Double): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal FLOAT value.
-    *
-    * @param value literal FLOAT value
-    */
-  def value(value: Float): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal value either for an explicit VARCHAR type or automatically derived type.
-    *
-    * If no type is set, the type is automatically derived from the value. Currently,
-    * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
-    *
-    * @param value literal value
-    */
-  def value(value: String): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal BIGINT value.
-    *
-    * @param value literal BIGINT value
-    */
-  def value(value: Long): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal TINYINT value.
-    *
-    * @param value literal TINYINT value
-    */
-  def value(value: Byte): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal SMALLINT value.
-    *
-    * @param value literal SMALLINT value
-    */
-  def value(value: Short): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Literal DECIMAL value.
-    *
-    * @param value literal DECIMAL value
-    */
-  def value(value: java.math.BigDecimal): LiteralValue = {
-    this.value = Option(value)
-    this
-  }
-
-  /**
-    * Converts this descriptor into a set of properties.
-    */
-  override def toProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-    addPropertiesWithPrefix(HierarchyDescriptorValidator.EMPTY_PREFIX, properties)
-    properties.asMap()
-  }
-
-  /**
-    * Internal method for properties conversion.
-    */
-  override private[flink] def addPropertiesWithPrefix(
-      keyPrefix: String,
-      properties: DescriptorProperties)
-    : Unit = {
-
-    typeInfo match {
-      // explicit type
-      case Some(ti) =>
-        properties.putString(keyPrefix + "type", ti)
-        value.foreach(v => properties.putString(keyPrefix + "value", String.valueOf(v)))
-      // implicit type
-      case None =>
-        // do not allow values in top-level
-        if (keyPrefix == HierarchyDescriptorValidator.EMPTY_PREFIX) {
-          throw new ValidationException(
-            "Literal values with implicit type must not exist in the top level of a hierarchy.")
-        }
-        value.foreach { v =>
-          properties.putString(keyPrefix.substring(0, keyPrefix.length - 1), String.valueOf(v))
-        }
-    }
-  }
-}
-
-/**
-  * Descriptor for a literal value. A literal value consists of a type and the actual value.
-  * Expression values are not allowed.
-  *
-  * If no type is set, the type is automatically derived from the value. Currently,
-  * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
-  *
-  * Examples:
-  *   - "true", "false" -> BOOLEAN
-  *   - "42", "-5" -> INT
-  *   - "2.0", "1234.222" -> DOUBLE
-  *   - VARCHAR otherwise
-  */
-object LiteralValue {
-
-  /**
-    * Descriptor for a literal value. A literal value consists of a type and the actual value.
-    * Expression values are not allowed.
-    *
-    * If no type is set, the type is automatically derived from the value. Currently,
-    * this is supported for: BOOLEAN, INT, DOUBLE, and VARCHAR.
-    *
-    * Examples:
-    *   - "true", "false" -> BOOLEAN
-    *   - "42", "-5" -> INT
-    *   - "2.0", "1234.222" -> DOUBLE
-    *   - VARCHAR otherwise
-    *
-    *   @deprecated Use `new Literal()`.
-    */
-  @deprecated
-  def apply() = new LiteralValue()
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala
deleted file mode 100644
index 4e85858..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt}
-
-import org.apache.flink.api.common.typeinfo.Types
-import org.apache.flink.table.api.{TableException, ValidationException}
-
-/**
-  * Validator for [[LiteralValue]].
-  */
-class LiteralValueValidator(keyPrefix: String) extends HierarchyDescriptorValidator(keyPrefix) {
-
-  /*
-   * TODO The following types need to be supported next.
-   * Types.SQL_DATE
-   * Types.SQL_TIME
-   * Types.SQL_TIMESTAMP
-   * Types.PRIMITIVE_ARRAY
-   * Types.OBJECT_ARRAY
-   * Types.MAP
-   * Types.MULTISET
-   * null
-   */
-
-  override protected def validateWithPrefix(
-      keyPrefix: String,
-      properties: DescriptorProperties)
-    : Unit = {
-
-    val typeKey = s"$keyPrefix${LiteralValueValidator.TYPE}"
-
-    properties.validateType(typeKey, true, false)
-
-    // explicit type
-    if (properties.containsKey(typeKey)) {
-      val valueKey = s"$keyPrefix${LiteralValueValidator.VALUE}"
-      val typeInfo = properties.getType(typeKey)
-      typeInfo match {
-        case Types.BIG_DEC => properties.validateBigDecimal(valueKey, false)
-        case Types.BOOLEAN => properties.validateBoolean(valueKey, false)
-        case Types.BYTE => properties.validateByte(valueKey, false)
-        case Types.DOUBLE => properties.validateDouble(valueKey, false)
-        case Types.FLOAT => properties.validateFloat(valueKey, false)
-        case Types.INT => properties.validateInt(valueKey, false)
-        case Types.LONG => properties.validateLong(valueKey, false)
-        case Types.SHORT => properties.validateShort(valueKey, false)
-        case Types.STRING => properties.validateString(valueKey, false)
-        case _ => throw new TableException(s"Unsupported type '$typeInfo'.")
-      }
-    }
-    // implicit type
-    else {
-      // do not allow values in top-level
-      if (keyPrefix == HierarchyDescriptorValidator.EMPTY_PREFIX) {
-        throw new ValidationException(
-          "Literal values with implicit type must not exist in the top level of a hierarchy.")
-      }
-      properties.validateString(keyPrefix.substring(0, keyPrefix.length - 1), false)
-    }
-  }
-}
-
-object LiteralValueValidator {
-  val TYPE = "type"
-  val VALUE = "value"
-
-  private val LITERAL_FALSE = "false"
-  private val LITERAL_TRUE = "true"
-
-  /**
-    * Gets the value according to the type and value strings.
-    *
-    * @param keyPrefix the prefix of the literal type key
-    * @param properties the descriptor properties
-    * @return the derived value
-    */
-  def getValue(keyPrefix: String, properties: DescriptorProperties): Any = {
-    val typeKey = s"$keyPrefix$TYPE"
-    // explicit type
-    if (properties.containsKey(typeKey)) {
-      val valueKey = s"$keyPrefix$VALUE"
-      val typeInfo = properties.getType(typeKey)
-      typeInfo match {
-        case Types.BIG_DEC => properties.getBigDecimal(valueKey)
-        case Types.BOOLEAN => properties.getBoolean(valueKey)
-        case Types.BYTE => properties.getByte(valueKey)
-        case Types.DOUBLE => properties.getDouble(valueKey)
-        case Types.FLOAT => properties.getFloat(valueKey)
-        case Types.INT => properties.getInt(valueKey)
-        case Types.LONG => properties.getLong(valueKey)
-        case Types.SHORT => properties.getShort(valueKey)
-        case Types.STRING => properties.getString(valueKey)
-        case _ => throw new TableException(s"Unsupported type '${typeInfo.getTypeClass}'.")
-      }
-    }
-    // implicit type
-    else {
-      deriveTypeStringFromValueString(
-        properties.getString(keyPrefix.substring(0, keyPrefix.length - 1)))
-    }
-  }
-
-  /**
-    * Tries to derive a literal value from the given string value.
-    * The derivation priority for the types are BOOLEAN, INT, DOUBLE, and VARCHAR.
-    *
-    * @param valueString the string formatted value
-    * @return parsed value
-    */
-  def deriveTypeStringFromValueString(valueString: String): AnyRef = {
-    if (valueString.equals(LITERAL_TRUE) || valueString.equals(LITERAL_FALSE)) {
-      JBoolean.valueOf(valueString)
-    } else {
-      try {
-        JInt.valueOf(valueString)
-      } catch {
-        case _: NumberFormatException =>
-          try {
-            JDouble.valueOf(valueString)
-          } catch {
-            case _: NumberFormatException =>
-              valueString
-          }
-      }
-    }
-  }
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala
index 8b75e15..fdb0ccf 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/ClassInstanceTest.scala
@@ -33,25 +33,25 @@ class ClassInstanceTest extends DescriptorTestBase {
   }
 
   override def descriptors(): JList[Descriptor] = {
-    val desc1 = ClassInstance()
+    val desc1 = new ClassInstance()
       .of("class1")
       .parameter(Types.LONG, "1")
       .parameter(
-        ClassInstance()
+        new ClassInstance()
           .of("class2")
           .parameter(
-            ClassInstance()
+            new ClassInstance()
               .of("class3")
               .parameterString("StarryNight")
               .parameter(
-                ClassInstance()
+                new ClassInstance()
                     .of("class4"))))
       .parameter(2L)
 
-    val desc2 = ClassInstance()
+    val desc2 = new ClassInstance()
       .of("class2")
 
-    val desc3 = ClassInstance()
+    val desc3 = new ClassInstance()
       .of("org.example.Function")
       .parameter(42)
       .parameter(2.asInstanceOf[Byte])
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala
index 1cd763f..a541850 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FunctionDescriptorTest.scala
@@ -28,13 +28,13 @@ import scala.collection.JavaConverters._
 class FunctionDescriptorTest extends DescriptorTestBase {
 
   override def descriptors(): JList[Descriptor] = {
-    val desc1 = FunctionDescriptor()
+    val desc1 = new FunctionDescriptor()
       .fromClass(
-        ClassInstance()
+        new ClassInstance()
           .of("my.class")
           .parameter("INT", "1")
           .parameter(
-            ClassInstance()
+            new ClassInstance()
               .of("my.class2")
               .parameterString("true")))
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala
index 271d563..8f633b1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/LiteralValueTest.scala
@@ -43,15 +43,15 @@ class LiteralValueTest extends DescriptorTestBase {
   }
 
   override def descriptors(): JList[Descriptor] = {
-    val bigDecimalDesc = LiteralValue().of(Types.DECIMAL).value(new JBigDecimal(1))
-    val booleanDesc = LiteralValue().of(Types.BOOLEAN).value(false)
-    val byteDesc = LiteralValue().of(Types.BYTE).value(4.asInstanceOf[Byte])
-    val doubleDesc = LiteralValue().of(Types.DOUBLE).value(7.0)
-    val floatDesc = LiteralValue().of(Types.FLOAT).value(8.0f)
-    val intDesc = LiteralValue().of(Types.INT).value(9)
-    val longDesc = LiteralValue().of(Types.LONG).value(10L)
-    val shortDesc = LiteralValue().of(Types.SHORT).value(11.asInstanceOf[Short])
-    val stringDesc = LiteralValue().of(Types.STRING).value("12")
+    val bigDecimalDesc = new LiteralValue().of(Types.DECIMAL).value(new JBigDecimal(1))
+    val booleanDesc = new LiteralValue().of(Types.BOOLEAN).value(false)
+    val byteDesc = new LiteralValue().of(Types.BYTE).value(4.asInstanceOf[Byte])
+    val doubleDesc = new LiteralValue().of(Types.DOUBLE).value(7.0)
+    val floatDesc = new LiteralValue().of(Types.FLOAT).value(8.0f)
+    val intDesc = new LiteralValue().of(Types.INT).value(9)
+    val longDesc = new LiteralValue().of(Types.LONG).value(10L)
+    val shortDesc = new LiteralValue().of(Types.SHORT).value(11.asInstanceOf[Short])
+    val stringDesc = new LiteralValue().of(Types.STRING).value("12")
 
     // for tests with implicit type see ClassInstanceTest because literal value are not
     // supported in the top level of a hierarchy
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
index bcef012..2ffc423 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
@@ -31,8 +31,8 @@ class FunctionServiceTest {
 
   @Test(expected = classOf[ValidationException])
   def testWrongArgsFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
-      .fromClass(ClassInstance()
+    val descriptor = new FunctionDescriptor()
+      .fromClass(new ClassInstance()
         .of(classOf[NoArgClass].getName)
         .parameterString("12"))
 
@@ -41,24 +41,24 @@ class FunctionServiceTest {
 
   @Test(expected = classOf[ValidationException])
   def testPrivateFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
-      .fromClass(ClassInstance().of(classOf[PrivateClass].getName))
+    val descriptor = new FunctionDescriptor()
+      .fromClass(new ClassInstance().of(classOf[PrivateClass].getName))
 
     FunctionService.createFunction(descriptor)
   }
 
   @Test(expected = classOf[ValidationException])
   def testInvalidClassFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
-      .fromClass(ClassInstance().of("this.class.does.not.exist"))
+    val descriptor = new FunctionDescriptor()
+      .fromClass(new ClassInstance().of("this.class.does.not.exist"))
 
     FunctionService.createFunction(descriptor)
   }
 
   @Test(expected = classOf[ValidationException])
   def testNotFunctionClassFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
-      .fromClass(ClassInstance()
+    val descriptor = new FunctionDescriptor()
+      .fromClass(new ClassInstance()
         .of(classOf[java.lang.String].getName)
         .parameterString("hello"))
 
@@ -67,17 +67,17 @@ class FunctionServiceTest {
 
   @Test
   def testNoArgFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
-      .fromClass(ClassInstance().of(classOf[NoArgClass].getName))
+    val descriptor = new FunctionDescriptor()
+      .fromClass(new ClassInstance().of(classOf[NoArgClass].getName))
 
     assertEquals(classOf[NoArgClass], FunctionService.createFunction(descriptor).getClass)
   }
 
   @Test
   def testOneArgFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
+    val descriptor = new FunctionDescriptor()
       .fromClass(
-        ClassInstance()
+        new ClassInstance()
           .of(classOf[OneArgClass].getName)
           .parameterString("false"))
 
@@ -89,12 +89,12 @@ class FunctionServiceTest {
 
   @Test
   def testMultiArgFunctionCreation(): Unit = {
-    val descriptor = FunctionDescriptor()
+    val descriptor = new FunctionDescriptor()
       .fromClass(
-        ClassInstance()
+        new ClassInstance()
           .of(classOf[MultiArgClass].getName)
           .parameter(new java.math.BigDecimal("12.0003"))
-          .parameter(ClassInstance()
+          .parameter(new ClassInstance()
             .of(classOf[java.math.BigInteger].getName)
             .parameter("111111111111111111111111111111111")))