You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/10 12:42:13 UTC

[3/3] flink git commit: [FLINK-8863] [sql-client] Add user-defined function support in SQL Client

[FLINK-8863] [sql-client] Add user-defined function support in SQL Client

This closes #6090.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8014dade
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8014dade
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8014dade

Branch: refs/heads/master
Commit: 8014dade4ba844786afa85fd517d6aba807dbc81
Parents: 79b38f8
Author: Xingcan Cui <xi...@gmail.com>
Authored: Sun May 27 23:36:25 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Tue Jul 10 13:55:21 2018 +0200

----------------------------------------------------------------------
 .../flink/table/client/config/Environment.java  |  20 ++
 .../client/config/UserDefinedFunction.java      |  98 +++++++
 .../client/gateway/local/ExecutionContext.java  |  44 +++
 .../gateway/local/LocalExecutorITCase.java      | 122 ++++++--
 .../gateway/utils/UserDefinedFunctions.java     | 105 +++++++
 .../src/test/resources/test-sql-client-udf.yaml |  82 ++++++
 .../flink/table/descriptors/ClassType.scala     | 123 ++++++++
 .../table/descriptors/ClassTypeValidator.scala  |  75 +++++
 .../descriptors/DescriptorProperties.scala      | 282 +++++++++++++++----
 .../table/descriptors/FunctionDescriptor.scala  |  52 ++++
 .../table/descriptors/FunctionValidator.scala   |  42 +++
 .../table/descriptors/HierarchyDescriptor.scala |  34 +++
 .../HierarchyDescriptorValidator.scala          |  35 +++
 .../flink/table/descriptors/PrimitiveType.scala |  55 ++++
 .../descriptors/PrimitiveTypeValidator.scala    | 146 ++++++++++
 .../descriptors/service/FunctionService.scala   |  87 ++++++
 .../flink/table/descriptors/ClassTypeTest.scala |  77 +++++
 .../flink/table/descriptors/FunctionTest.scala  |  65 +++++
 .../table/descriptors/PrimitiveTypeTest.scala   | 117 ++++++++
 19 files changed, 1591 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index 0efb665..c1db4c1 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -46,10 +46,13 @@ public class Environment {
 
 	private Deployment deployment;
 
+	private Map<String, UserDefinedFunction> functions;
+
 	public Environment() {
 		this.tables = Collections.emptyMap();
 		this.execution = new Execution();
 		this.deployment = new Deployment();
+		this.functions = Collections.emptyMap();
 	}
 
 	public Map<String, TableDescriptor> getTables() {
@@ -76,6 +79,14 @@ public class Environment {
 		});
 	}
 
+	public void setFunctions(List<Map<String, Object>> functions) {
+		this.functions = new HashMap<>(functions.size());
+		functions.forEach(config -> {
+			final UserDefinedFunction f = UserDefinedFunction.create(config);
+			this.functions.put(f.name(), f);
+		});
+	}
+
 	public void setExecution(Map<String, Object> config) {
 		this.execution = Execution.create(config);
 	}
@@ -92,6 +103,10 @@ public class Environment {
 		return deployment;
 	}
 
+	public Map<String, UserDefinedFunction> getFunctions() {
+		return functions;
+	}
+
 	@Override
 	public String toString() {
 		final StringBuilder sb = new StringBuilder();
@@ -136,6 +151,11 @@ public class Environment {
 		tables.putAll(env2.getTables());
 		mergedEnv.tables = tables;
 
+		// merge functions
+		final Map<String, UserDefinedFunction> functions = new HashMap<>(env1.getFunctions());
+		mergedEnv.getFunctions().putAll(env2.getFunctions());
+		mergedEnv.functions = functions;
+
 		// merge execution properties
 		mergedEnv.execution = Execution.merge(env1.getExecution(), env2.getExecution());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
new file mode 100644
index 0000000..235f4ea
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.UserDefinedFunction.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UserDefinedFunction extends FunctionDescriptor {
+
+	private static final String FROM = "from";
+
+	private From from;
+
+	private Map<String, String> properties;
+
+	private UserDefinedFunction(String name, From from, Map<String, String> properties) {
+		super(name);
+		this.from = from;
+		this.properties = properties;
+	}
+
+	/**
+	 * Gets where the user-defined function should be created from.
+	 */
+	public From getFrom() {
+		return from;
+	}
+
+	/**
+	 * Creates a UDF descriptor with the given config.
+	 */
+	public static UserDefinedFunction create(Map<String, Object> config) {
+		Map<String, String> udfConfig = ConfigUtil.normalizeYaml(config);
+		if (!udfConfig.containsKey(FunctionValidator.FUNCTION_NAME())) {
+			throw new SqlClientException("The 'name' attribute of a function is missing.");
+		}
+
+		final String name = udfConfig.get(FunctionValidator.FUNCTION_NAME());
+		if (name.trim().length() <= 0) {
+			throw new SqlClientException("Invalid function name '" + name + "'.");
+		}
+
+		// the default value is "CLASS"
+		From fromValue = CLASS;
+
+		if (udfConfig.containsKey(FROM)) {
+			final String from = udfConfig.get(FROM);
+			try {
+				fromValue = From.valueOf(from.toUpperCase());
+			} catch (IllegalArgumentException ex) {
+				throw new SqlClientException("Unknown 'from' value '" + from + "'.");
+			}
+		}
+
+		switch (fromValue) {
+			case CLASS:
+				return new UserDefinedFunction(name, fromValue, udfConfig);
+			default:
+				throw new SqlClientException("The from attribute can only be \"class\" now.");
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void addProperties(DescriptorProperties properties) {
+		this.properties.forEach(properties::putString);
+	}
+
+	enum From {
+		CLASS
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index c62faa4..0cd9738 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -42,11 +42,19 @@ import org.apache.flink.table.api.BatchQueryConfig;
 import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.client.config.Deployment;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.TableSourceDescriptor;
+import org.apache.flink.table.descriptors.service.FunctionService;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceFactoryService;
 import org.apache.flink.util.FlinkException;
@@ -73,6 +81,7 @@ public class ExecutionContext<T> {
 	private final List<URL> dependencies;
 	private final ClassLoader classLoader;
 	private final Map<String, TableSource<?>> tableSources;
+	private final Map<String, UserDefinedFunction> functions;
 	private final Configuration flinkConfig;
 	private final CommandLine commandLine;
 	private final CustomCommandLine<T> activeCommandLine;
@@ -102,6 +111,16 @@ public class ExecutionContext<T> {
 			}
 		});
 
+		// generate user-defined functions
+		functions = new HashMap<>();
+		mergedEnv.getFunctions().forEach((name, descriptor) -> {
+			DescriptorProperties properties = new DescriptorProperties(true);
+			descriptor.addProperties(properties);
+			functions.put(
+					name,
+					FunctionService.generateUserDefinedFunction(properties, classLoader));
+		});
+
 		// convert deployment options into command line options that describe a cluster
 		commandLine = createCommandLine(mergedEnv.getDeployment(), commandLineOptions);
 		activeCommandLine = findActiveCommandLine(availableCommandLines, commandLine);
@@ -207,6 +226,31 @@ public class ExecutionContext<T> {
 
 			// register table sources
 			tableSources.forEach(tableEnv::registerTableSource);
+
+			// register UDFs
+			if (tableEnv instanceof StreamTableEnvironment) {
+				StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) tableEnv;
+				functions.forEach((k, v) -> {
+					if (v instanceof ScalarFunction) {
+						streamTableEnvironment.registerFunction(k, (ScalarFunction) v);
+					} else if (v instanceof AggregateFunction) {
+						streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
+					} else if (v instanceof TableFunction) {
+						streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
+					}
+				});
+			} else {
+				BatchTableEnvironment batchTableEnvironment = (BatchTableEnvironment) tableEnv;
+				functions.forEach((k, v) -> {
+					if (v instanceof ScalarFunction) {
+						batchTableEnvironment.registerFunction(k, (ScalarFunction) v);
+					} else if (v instanceof AggregateFunction) {
+						batchTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
+					} else if (v instanceof TableFunction) {
+						batchTableEnvironment.registerFunction(k, (TableFunction<?>) v);
+					}
+				});
+			}
 		}
 
 		public QueryConfig getQueryConfig() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 9a1eb08..b32353f 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -66,6 +66,7 @@ import static org.junit.Assert.assertTrue;
 public class LocalExecutorITCase extends TestLogger {
 
 	private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
+	private static final String UDF_ENVIRONMENT_FILE = "test-sql-client-udf.yaml";
 
 	private static final int NUM_TMS = 2;
 	private static final int NUM_SLOTS_PER_TM = 2;
@@ -149,6 +150,68 @@ public class LocalExecutorITCase extends TestLogger {
 	}
 
 	@Test(timeout = 30_000L)
+	public void testScalarUDF() throws Exception {
+		final Executor executor =
+				createDefaultExecutor(UDF_ENVIRONMENT_FILE, clusterClient);
+		final SessionContext session = new SessionContext("test-scalarUDF", new Environment());
+		final ResultDescriptor rd =
+				executor.executeQuery(session, "SELECT scalarUDF(10)");
+		final List<String> actualResults =
+				retrieveChangelogResult(executor, session, rd.getResultId());
+		final List<String> expectedResults = new ArrayList<>();
+		expectedResults.add("(true,15)");
+		TestBaseUtils.compareResultCollections(
+				expectedResults, actualResults, Comparator.naturalOrder());
+	}
+
+	@Test(timeout = 30_000L)
+	public void testAggregateUDF() throws Exception {
+		final Executor executor =
+				createDefaultExecutor(UDF_ENVIRONMENT_FILE, clusterClient);
+		final SessionContext session = new SessionContext("test-aggregateUDF", new Environment());
+		final ResultDescriptor rd =
+				executor.executeQuery(session, "SELECT aggregateUDF(cast(1 as BIGINT))");
+		final List<String> actualResults =
+				retrieveChangelogResult(executor, session, rd.getResultId());
+		final List<String> expectedResults = new ArrayList<>();
+		expectedResults.add("(true,100)");
+		TestBaseUtils.compareResultCollections(
+				expectedResults, actualResults, Comparator.naturalOrder());
+	}
+
+	@Test(timeout = 30_000L)
+	public void testTableUDF() throws Exception {
+		final URL url = getClass().getClassLoader().getResource("test-data.csv");
+		Objects.requireNonNull(url);
+		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_0", url.getPath());
+		final Executor executor =
+				createModifiedExecutor(UDF_ENVIRONMENT_FILE, clusterClient, replaceVars);
+
+		final SessionContext session = new SessionContext("test-aggregateUDF", new Environment());
+		final ResultDescriptor rd =
+				executor.executeQuery(
+						session,
+						"SELECT w, l from TableNumber1, LATERAL TABLE(tableUDF(StringField1)) as T(w, l)");
+		final List<String> actualResults =
+				retrieveChangelogResult(executor, session, rd.getResultId());
+		final List<String> expectedResults = new ArrayList<>();
+		expectedResults.add("(true,Hello,10)");
+		expectedResults.add("(true,World,10)");
+		expectedResults.add("(true,Hello,10)");
+		expectedResults.add("(true,World,10)");
+		expectedResults.add("(true,Hello,10)");
+		expectedResults.add("(true,World,10)");
+		expectedResults.add("(true,Hello,10)");
+		expectedResults.add("(true,World,10)");
+		expectedResults.add("(true,Hello,10)");
+		expectedResults.add("(true,World,10)");
+		expectedResults.add("(true,Hello,10)");
+		expectedResults.add("(true,World!!!!,14)");
+		TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
+	}
+
+	@Test(timeout = 30_000L)
 	public void testStreamQueryExecutionChangelog() throws Exception {
 		final URL url = getClass().getClassLoader().getResource("test-data.csv");
 		Objects.requireNonNull(url);
@@ -167,20 +230,8 @@ public class LocalExecutorITCase extends TestLogger {
 
 			assertFalse(desc.isMaterialized());
 
-			final List<String> actualResults = new ArrayList<>();
-
-			while (true) {
-				Thread.sleep(50); // slow the processing down
-				final TypedResult<List<Tuple2<Boolean, Row>>> result =
-					executor.retrieveResultChanges(session, desc.getResultId());
-				if (result.getType() == TypedResult.ResultType.PAYLOAD) {
-					for (Tuple2<Boolean, Row> change : result.getPayload()) {
-						actualResults.add(change.toString());
-					}
-				} else if (result.getType() == TypedResult.ResultType.EOS) {
-					break;
-				}
-			}
+			final List<String> actualResults =
+					retrieveChangelogResult(executor, session, desc.getResultId());
 
 			final List<String> expectedResults = new ArrayList<>();
 			expectedResults.add("(true,42,Hello World)");
@@ -266,19 +317,32 @@ public class LocalExecutorITCase extends TestLogger {
 	}
 
 	private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> clusterClient) throws Exception {
+		return createDefaultExecutor(DEFAULTS_ENVIRONMENT_FILE, clusterClient);
+	}
+
+	private <T> LocalExecutor createDefaultExecutor(
+			String configFileName, ClusterClient<T>
+			clusterClient) throws Exception {
 		return new LocalExecutor(
-			EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, Collections.singletonMap("$VAR_2", "batch")),
+			EnvironmentFileUtil.parseModified(configFileName, Collections.singletonMap("$VAR_2", "batch")),
 			Collections.emptyList(),
 			clusterClient.getFlinkConfiguration(),
 			new DummyCustomCommandLine<T>(clusterClient));
 	}
 
 	private <T> LocalExecutor createModifiedExecutor(ClusterClient<T> clusterClient, Map<String, String> replaceVars) throws Exception {
+		return createModifiedExecutor(DEFAULTS_ENVIRONMENT_FILE, clusterClient, replaceVars);
+	}
+
+	private <T> LocalExecutor createModifiedExecutor(
+			String configFileName,
+			ClusterClient<T> clusterClient,
+			Map<String, String>	replaceVars) throws Exception {
 		return new LocalExecutor(
-			EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
-			Collections.emptyList(),
-			clusterClient.getFlinkConfiguration(),
-			new DummyCustomCommandLine<T>(clusterClient));
+				EnvironmentFileUtil.parseModified(configFileName, replaceVars),
+				Collections.emptyList(),
+				clusterClient.getFlinkConfiguration(),
+				new DummyCustomCommandLine<T>(clusterClient));
 	}
 
 	private List<String> retrieveTableResult(
@@ -304,4 +368,24 @@ public class LocalExecutorITCase extends TestLogger {
 
 		return actualResults;
 	}
+
+	private List<String> retrieveChangelogResult(
+			Executor executor,
+			SessionContext session,
+			String resultID) throws InterruptedException {
+		final List<String> actualResults = new ArrayList<>();
+		while (true) {
+			Thread.sleep(50); // slow the processing down
+			final TypedResult<List<Tuple2<Boolean, Row>>> result =
+					executor.retrieveResultChanges(session, resultID);
+			if (result.getType() == TypedResult.ResultType.PAYLOAD) {
+				for (Tuple2<Boolean, Row> change : result.getPayload()) {
+					actualResults.add(change.toString());
+				}
+			} else if (result.getType() == TypedResult.ResultType.EOS) {
+				break;
+			}
+		}
+		return actualResults;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
new file mode 100644
index 0000000..b93ef66
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+/**
+ * A bunch of UDFs for SQL-Client test.
+ */
+public class UserDefinedFunctions {
+
+	/**
+	 * The scalar function for SQL-Client test.
+	 */
+	public static class ScalarUDF extends ScalarFunction {
+
+		private int offset;
+
+		public ScalarUDF(Integer offset) {
+			this.offset = offset;
+		}
+
+		public String eval(Integer i) {
+			return String.valueOf(i + offset);
+		}
+	}
+
+	/**
+	 * The aggregate function for SQL-Client test.
+	 */
+	public static class AggregateUDF extends AggregateFunction<Long, Long> {
+
+		public AggregateUDF(String name, Boolean flag, Integer value) {
+
+		}
+
+		@Override
+		public Long createAccumulator() {
+			return 0L;
+		}
+
+		@Override
+		public Long getValue(Long accumulator) {
+			return 100L;
+		}
+
+		public void accumulate(Long acc, Long value) {
+
+		}
+
+		@Override
+		public TypeInformation<Long> getResultType() {
+			return BasicTypeInfo.LONG_TYPE_INFO;
+		}
+	}
+
+	/**
+	 * The table function for SQL-Client test.
+	 */
+	public static class TableUDF extends TableFunction<Row> {
+		private long extra = 2L;
+
+		public TableUDF(Long extra) {
+			this.extra = extra;
+		}
+
+		public void eval(String str) {
+			for (String s : str.split(" ")) {
+				Row r = new Row(2);
+				r.setField(0, s);
+				r.setField(1, s.length() + extra);
+				collect(r);
+			}
+		}
+
+		@Override
+		public TypeInformation<Row> getResultType() {
+			return Types.ROW(Types.STRING(), Types.LONG());
+		}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
new file mode 100644
index 0000000..3e16030
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
@@ -0,0 +1,82 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+#==============================================================================
+# TEST ENVIRONMENT FILE
+# Intended for org.apache.flink.table.client.gateway.local.LocalExecutorITCase.
+#==============================================================================
+
+# this file has variables that can be filled with content by replacing $VAR_XXX
+
+tables:
+  - name: TableNumber1
+    type: source
+    schema:
+      - name: IntegerField1
+        type: INT
+      - name: StringField1
+        type: VARCHAR
+    connector:
+      type: filesystem
+      path: "$VAR_0"
+    format:
+      type: csv
+      fields:
+        - name: IntegerField1
+          type: INT
+        - name: StringField1
+          type: VARCHAR
+      line-delimiter: "\n"
+      comment-prefix: "#"
+
+functions:
+  - name: scalarUDF
+    from: class
+    class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$ScalarUDF
+    constructor:
+      - 5
+
+  - name: aggregateUDF
+    from: class
+    class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$AggregateUDF
+    constructor:
+      - StarryName
+      - false
+      - class: java.lang.Integer
+        constructor:
+          - class: java.lang.String
+            constructor:
+              - type: VARCHAR
+                value: 3
+
+  - name: tableUDF
+    from: class
+    class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$TableUDF
+    constructor:
+      - type: LONG
+        value: 5
+
+execution:
+  type: streaming
+  parallelism: 1
+  result-mode: changelog
+
+deployment:
+  response-timeout: 5000
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
new file mode 100644
index 0000000..78ceb3a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * Descriptor for a class type.
+  *
+  * @param className the full name of the class (e.g., java.lang.Integer)
+  */
+class ClassType(var className: Option[String] = None)
+  extends HierarchyDescriptor {
+
+  // the parameter is either a primitive type or a class type
+  private val constructor: ArrayBuffer[Either[PrimitiveType[_], ClassType]] =
+    ArrayBuffer()
+
+  /**
+    * Sets the class name for the descriptor.
+    */
+  def of(name: String): ClassType = {
+    this.className = Option(name)
+    this
+  }
+
+  /**
+    * Adds the given string formatted value as a parameter, the type of which will be automatically
+    * derived (e.g., "true" -> Boolean, "1" -> Integer, "2.0" -> Double and "abc" -> String).
+    *
+    */
+  def strParam(valueStr: String): ClassType = {
+    val typeString = PrimitiveTypeValidator.deriveTypeStrFromValueStr(valueStr)
+    param(typeString, valueStr)
+  }
+
+  /**
+    * Adds the given string formatted value as a parameter, the type of which will be decided by the
+    * given type string (e.g., "DOUBLE", "VARCHAR").
+    */
+  def param(typeStr: String, valueStr: String): ClassType = {
+    param(TypeStringUtils.readTypeInfo(typeStr), valueStr)
+  }
+
+  /**
+    * Adds the give string formatted value as a parameter, the type of which will be defined by the
+    * given type information.
+    */
+  def param[T](typeInfo: TypeInformation[T], valueStr: String): ClassType = {
+    constructor += Left(
+      new PrimitiveType[T]()
+        .of(typeInfo)
+        .value(PrimitiveTypeValidator.deriveTypeAndValueStr(typeInfo, valueStr)))
+    this
+  }
+
+  /**
+    * Adds the give value as a parameter, the type of which will be automatically derived.
+    */
+  def param[T](value: T): ClassType = {
+    constructor += Left(
+      new PrimitiveType[T]()
+        .of(TypeInformation.of(value.getClass.asInstanceOf[Class[T]]))
+        .value(value))
+    this
+  }
+
+  /**
+    * Adds a parameter defined by the given class type descriptor.
+    */
+  def param(field: ClassType): ClassType = {
+    constructor += Right(field)
+    this
+  }
+
+  override private[flink] def addProperties(properties: DescriptorProperties): Unit =  {
+    addPropertiesWithPrefix("", properties)
+  }
+
+  override private[flink] def addPropertiesWithPrefix(
+      keyPrefix: String,
+      properties: DescriptorProperties): Unit = {
+    className.foreach(properties.putString(s"$keyPrefix${ClassTypeValidator.CLASS}", _))
+    var i = 0
+    while (i < constructor.size) {
+      constructor(i) match {
+        case Left(basicType) =>
+          basicType.addPropertiesWithPrefix(
+            s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.",
+            properties)
+        case Right(classType) =>
+          classType.addPropertiesWithPrefix(
+            s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.",
+            properties)
+      }
+      i += 1
+    }
+  }
+}
+
+object ClassType {
+  def apply() = new ClassType
+  def apply(className: String) = new ClassType(Option(className))
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
new file mode 100644
index 0000000..ebb48b5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+
+import scala.collection.JavaConversions._
+
+/**
+  * Validator for [[ClassType]].
+  */
+class ClassTypeValidator extends HierarchyDescriptorValidator {
+  override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
+
+    properties.validateString(s"$keyPrefix${ClassTypeValidator.CLASS}", isOptional = false)
+
+    val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}"
+    normalizeConstructorParams(constructorPrefix, properties)
+
+    val constructorProps =
+      properties.getVariableIndexedProperties(constructorPrefix, List())
+    var i = 0
+    val primitiveTypeValidator = new PrimitiveTypeValidator
+    while (i < constructorProps.size()) {
+      if (constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) {
+        primitiveTypeValidator.validateWithPrefix(s"$constructorPrefix.$i.", properties)
+      } else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) {
+        validateWithPrefix(s"$constructorPrefix.$i.", properties)
+      } else {
+        throw ValidationException("A constructor field must contain a 'class' or a 'type' key.")
+      }
+      i += 1
+    }
+  }
+
+  /**
+    * For each constructor parameter (e.g., constructor.0 = abc), we derive its type and replace it
+    * with the normalized form (e.g., constructor.0.type = VARCHAR, constructor.0.value = abc);
+    *
+    * @param constructorPrefix the prefix to get the constructor parameters
+    * @param properties the descriptor properties
+    */
+  def normalizeConstructorParams(
+    constructorPrefix: String,
+    properties: DescriptorProperties): Unit = {
+    val constructorValues = properties.getListProperties(constructorPrefix)
+    constructorValues.foreach(kv => {
+      properties.unsafeRemove(kv._1)
+      val tp = PrimitiveTypeValidator.deriveTypeStrFromValueStr(kv._2)
+      properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_TYPE}", tp)
+      properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_VALUE}", kv._2)
+    })
+  }
+}
+
+object ClassTypeValidator {
+  val CLASS = "class"
+  val CONSTRUCTOR = "constructor"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index 4b0b60d..8d410d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.table.descriptors
 
 import java.io.Serializable
-import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, Long => JLong}
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort}
+import java.math.{BigDecimal => JBigDecimal}
 import java.util
 import java.util.function.{Consumer, Supplier}
 import java.util.regex.Pattern
@@ -238,10 +239,25 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
   }
 
   /**
+    * Returns a big decimal value under the given key if it exists.
+    */
+  def getOptionalBigDecimal(key: String): Optional[JBigDecimal] = {
+    val value = properties.get(key).map(new JBigDecimal(_))
+    toJava(value)
+  }
+
+  /**
+    * Returns a big decimal value under the given existing key.
+    */
+  def getBigDecimal(key: String): BigDecimal = {
+    getOptionalBigDecimal(key).orElseThrow(exceptionSupplier(key))
+  }
+
+  /**
     * Returns a boolean value under the given key if it exists.
     */
   def getOptionalBoolean(key: String): Optional[JBoolean] = {
-    val value = properties.get(key).map(JBoolean.parseBoolean(_)).map(Boolean.box)
+    val value = properties.get(key).map(JBoolean.parseBoolean).map(Boolean.box)
     toJava(value)
   }
 
@@ -253,10 +269,55 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
   }
 
   /**
+    * Returns a byte value under the given key if it exists.
+    */
+  def getOptionalByte(key: String): Optional[JByte] = {
+    val value = properties.get(key).map(JByte.parseByte).map(Byte.box)
+    toJava(value)
+  }
+
+  /**
+    * Returns a byte value under the given existing key.
+    */
+  def getByte(key: String): Byte = {
+    getOptionalByte(key).orElseThrow(exceptionSupplier(key))
+  }
+
+  /**
+    * Returns a double value under the given key if it exists.
+    */
+  def getOptionalDouble(key: String): Optional[JDouble] = {
+    val value = properties.get(key).map(JDouble.parseDouble).map(Double.box)
+    toJava(value)
+  }
+
+  /**
+    * Returns a double value under the given key if it exists.
+    */
+  def getDouble(key: String): Double = {
+    getOptionalDouble(key).orElseThrow(exceptionSupplier(key))
+  }
+
+  /**
+    * Returns a float value under the given key if it exists.
+    */
+  def getOptionalFloat(key: String): Optional[JFloat] = {
+    val value = properties.get(key).map(JFloat.parseFloat).map(Float.box)
+    toJava(value)
+  }
+
+  /**
+    * Returns a float value under the given key if it exists.
+    */
+  def getFloat(key: String): Float = {
+    getOptionalFloat(key).orElseThrow(exceptionSupplier(key))
+  }
+
+  /**
     * Returns an integer value under the given key if it exists.
     */
   def getOptionalInt(key: String): Optional[JInt] = {
-    val value = properties.get(key).map(JInt.parseInt(_)).map(Int.box)
+    val value = properties.get(key).map(JInt.parseInt).map(Int.box)
     toJava(value)
   }
 
@@ -271,7 +332,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     * Returns a long value under the given key if it exists.
     */
   def getOptionalLong(key: String): Optional[JLong] = {
-    val value = properties.get(key).map(JLong.parseLong(_)).map(Long.box)
+    val value = properties.get(key).map(JLong.parseLong).map(Long.box)
     toJava(value)
   }
 
@@ -283,18 +344,18 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
   }
 
   /**
-    * Returns a double value under the given key if it exists.
+    * Returns a short value under the given key if it exists.
     */
-  def getOptionalDouble(key: String): Optional[JDouble] = {
-    val value = properties.get(key).map(JDouble.parseDouble(_)).map(Double.box)
+  def getOptionalShort(key: String): Optional[JShort] = {
+    val value = properties.get(key).map(JShort.parseShort).map(Short.box)
     toJava(value)
   }
 
   /**
-    * Returns a double value under the given key if it exists.
+    * Returns a short value under the given existing key.
     */
-  def getDouble(key: String): Double = {
-    getOptionalDouble(key).orElseThrow(exceptionSupplier(key))
+  def getShort(key: String): Short = {
+    getOptionalShort(key).orElseThrow(exceptionSupplier(key))
   }
 
   /**
@@ -470,6 +531,16 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
   }
 
   /**
+    * Returns all properties under a group of array formed keys.
+    *
+    * E.g. constructor -> returns all constructor.# properties.
+    */
+  def getListProperties(key: String): JMap[String, String] = {
+    val escapedKey = Pattern.quote(key)
+    properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+")).asJava
+  }
+
+  /**
     * Returns all properties under a given key that contains an index in between.
     *
     * E.g. rowtime.0.name -> returns all rowtime.#.name properties
@@ -566,24 +637,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       min: Int, // inclusive
       max: Int) // inclusive
     : Unit = {
-
-    if (!properties.contains(key)) {
-      if (!isOptional) {
-        throw new ValidationException(s"Could not find required property '$key'.")
-      }
-    } else {
-      try {
-        val value = Integer.parseInt(properties(key))
-        if (value < min || value > max) {
-          throw new ValidationException(s"Property '$key' must be an integer value between $min " +
-            s"and $max but was: ${properties(key)}")
-        }
-      } catch {
-        case _: NumberFormatException =>
-          throw new ValidationException(
-            s"Property '$key' must be an integer value but was: ${properties(key)}")
-      }
-    }
+    validateComparable(key, isOptional, new Integer(min), new Integer(max), Integer.valueOf)
   }
 
   /**
@@ -616,24 +670,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       min: Long, // inclusive
       max: Long) // inclusive
     : Unit = {
-
-    if (!properties.contains(key)) {
-      if (!isOptional) {
-        throw new ValidationException(s"Could not find required property '$key'.")
-      }
-    } else {
-      try {
-        val value = JLong.parseLong(properties(key))
-        if (value < min || value > max) {
-          throw new ValidationException(s"Property '$key' must be a long value between $min " +
-            s"and $max but was: ${properties(key)}")
-        }
-      } catch {
-        case _: NumberFormatException =>
-          throw new ValidationException(
-            s"Property '$key' must be a long value but was: ${properties(key)}")
-      }
-    }
+    validateComparable(key, isOptional, new JLong(min), new JLong(max), JLong.valueOf)
   }
 
   /**
@@ -699,22 +736,165 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       min: Double, // inclusive
       max: Double) // inclusive
     : Unit = {
+    validateComparable(key, isOptional, new JDouble(min), new JDouble(max), JDouble.valueOf)
+  }
 
+  /**
+    * Validates a big decimal property.
+    */
+  def validateBigDecimal(
+      key: String,
+      isOptional: Boolean): Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property '$key'.")
+      }
+    } else {
+      try {
+        new JBigDecimal(properties(key))
+      } catch {
+        case _: NumberFormatException =>
+          throw new ValidationException(
+            s"Property '$key' must be a big decimal value but was: ${properties(key)}")
+      }
+    }
+  }
+
+  /**
+    * Validates a big decimal property. The boundaries are inclusive.
+    */
+  def validateBigDecimal(
+      key: String,
+      isOptional: Boolean,
+      min: BigDecimal, // inclusive
+      max: BigDecimal) // inclusive
+  : Unit = {
+    validateComparable(
+      key,
+      isOptional,
+      min.bigDecimal,
+      max.bigDecimal,
+      (value: String) => new JBigDecimal(value))
+  }
+
+  /**
+    * Validates a byte property.
+    */
+  def validateByte(
+      key: String,
+      isOptional: Boolean): Unit = validateDouble(key, isOptional, Byte.MinValue, Byte.MaxValue)
+
+  /**
+    * Validates a byte property. The boundaries are inclusive.
+    */
+  def validateByte(
+      key: String,
+      isOptional: Boolean,
+      min: Byte) // inclusive
+  : Unit = {
+    validateByte(key, isOptional, min, Byte.MaxValue)
+  }
+
+  /**
+    * Validates a byte property. The boundaries are inclusive.
+    */
+  def validateByte(
+      key: String,
+      isOptional: Boolean,
+      min: Byte, // inclusive
+      max: Byte) // inclusive
+  : Unit = {
+    validateComparable(key, isOptional, new JByte(min), new JByte(max), JByte.valueOf)
+  }
+
+  /**
+    * Validates a float property.
+    */
+  def validateFloat(
+      key: String,
+      isOptional: Boolean): Unit = validateFloat(key, isOptional, Float.MinValue, Float.MaxValue)
+
+  /**
+    * Validates a float property. The boundaries are inclusive.
+    */
+  def validateFloat(
+      key: String,
+      isOptional: Boolean,
+      min: Float) // inclusive
+  : Unit = {
+    validateFloat(key, isOptional, min, Float.MaxValue)
+  }
+
+  /**
+    * Validates a float property. The boundaries are inclusive.
+    */
+  def validateFloat(
+      key: String,
+      isOptional: Boolean,
+      min: Float, // inclusive
+      max: Float) // inclusive
+  : Unit = {
+    validateComparable(key, isOptional, new JFloat(min), new JFloat(max), JFloat.valueOf)
+  }
+
+  /**
+    * Validates a short property.
+    */
+  def validateShort(
+      key: String,
+      isOptional: Boolean): Unit = validateFloat(key, isOptional, Short.MinValue, Short.MaxValue)
+
+  /**
+    * Validates a short property. The boundaries are inclusive.
+    */
+  def validateFloat(
+      key: String,
+      isOptional: Boolean,
+      min: Short) // inclusive
+  : Unit = {
+    validateShort(key, isOptional, min, Short.MaxValue)
+  }
+
+  /**
+    * Validates a float property. The boundaries are inclusive.
+    */
+  def validateShort(
+      key: String,
+      isOptional: Boolean,
+      min: Short, // inclusive
+      max: Short) // inclusive
+  : Unit = {
+    validateComparable(key, isOptional, new JShort(min), new JShort(max), JShort.valueOf)
+  }
+
+  /**
+    * Validates a property by first parsing the string value to a comparable object.
+    * The boundaries are inclusive.
+    */
+  private def validateComparable[T <: Comparable[T]](
+      key: String,
+      isOptional: Boolean,
+      min: T,
+      max: T,
+      parseFunction: String => T)
+  : Unit = {
     if (!properties.contains(key)) {
       if (!isOptional) {
         throw new ValidationException(s"Could not find required property '$key'.")
       }
     } else {
       try {
-        val value = JDouble.parseDouble(properties(key))
-        if (value < min || value > max) {
-          throw new ValidationException(s"Property '$key' must be a double value between $min " +
-            s"and $max but was: ${properties(key)}")
+        val value = parseFunction(properties(key))
+
+        if (value.compareTo(min) < 0 || value.compareTo(max) > 0) {
+          throw new ValidationException(s"Property '$key' must be a ${min.getClass.getSimpleName}" +
+            s" value between $min and $max but was: ${properties(key)}")
         }
       } catch {
         case _: NumberFormatException =>
           throw new ValidationException(
-            s"Property '$key' must be an double value but was: ${properties(key)}")
+            s"Property '$key' must be a byte value but was: ${properties(key)}")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
new file mode 100644
index 0000000..f4c8363
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Descriptor for describing a function that can be instantiated from somewhere (e.g., a class).
+  *
+  * @param name name of the function
+  */
+class FunctionDescriptor(var name: String) extends Descriptor {
+
+  var classDescriptor: Option[ClassType] = None
+
+  /**
+    * Uses the class provided by the descriptor to instantiate the function.
+    */
+  def using(classDescriptor: ClassType): FunctionDescriptor = {
+    this.classDescriptor = Option(classDescriptor)
+    this
+  }
+
+  def getDescriptorProperties: DescriptorProperties = {
+    val descriptorProperties = new DescriptorProperties()
+    addProperties(descriptorProperties)
+    descriptorProperties
+  }
+
+  override def addProperties(properties: DescriptorProperties): Unit = {
+    properties.putString(FunctionValidator.FUNCTION_NAME, name)
+    classDescriptor.foreach(_.addProperties(properties))
+  }
+}
+
+object FunctionDescriptor {
+  def apply(name: String): FunctionDescriptor = new FunctionDescriptor(name)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
new file mode 100644
index 0000000..f36b9c2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.functions.UserDefinedFunction
+
+import scala.collection.JavaConverters._
+
+/**
+  * Validator for [[FunctionDescriptor]].
+  */
+class FunctionValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateString(FunctionValidator.FUNCTION_NAME, isOptional = false, 1)
+    new ClassTypeValidator().validate(properties)
+  }
+}
+
+object FunctionValidator {
+
+  val FUNCTION_NAME = "name"
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
new file mode 100644
index 0000000..f97c807
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A descriptor that may exist in an arbitrary level (be recursively included by other
+  * descriptors).
+  */
+abstract class HierarchyDescriptor extends Descriptor {
+
+  /**
+    * Internal method for properties conversion. All the property keys will be prefixed according
+    * to the level.
+    */
+  private[flink] def addPropertiesWithPrefix(
+    keyPrefix: String, properties: DescriptorProperties): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
new file mode 100644
index 0000000..73dd1f9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Validator for a [[HierarchyDescriptor]].
+  */
+trait HierarchyDescriptorValidator extends DescriptorValidator{
+
+  def validate(properties: DescriptorProperties): Unit = {
+    validateWithPrefix("", properties)
+  }
+
+  /**
+    * Performs validation with a prefix for the keys.
+    */
+  def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala
new file mode 100644
index 0000000..9e5fcab
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+  * Descriptor for a primitive type. Use internally only.
+  */
+class PrimitiveType[T] extends HierarchyDescriptor {
+
+  var typeInformation: TypeInformation[T] = _
+  var value: T = _
+
+  def of(basicType: TypeInformation[T]): PrimitiveType[T] = {
+    typeInformation = basicType
+    this
+  }
+
+  def value(value: T): PrimitiveType[T] = {
+    this.value = value
+    this
+  }
+
+  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+    addPropertiesWithPrefix("", properties)
+  }
+
+  override private[flink] def addPropertiesWithPrefix(
+      keyPrefix: String, properties: DescriptorProperties): Unit = {
+    properties.putString(keyPrefix + "type", TypeStringUtils.writeTypeInfo(typeInformation))
+    properties.putString(keyPrefix + "value", value.toString)
+  }
+}
+
+object PrimitiveType {
+  def apply[T]() = new PrimitiveType[T]()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
new file mode 100644
index 0000000..9b2f776
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort}
+import java.math.{BigDecimal => JBigDecimal}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableException, Types}
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+  * Validator for [[PrimitiveType]].
+  */
+class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
+
+  /*
+   * TODO The following types need to be supported next.
+   * Types.SQL_DATE
+   * Types.SQL_TIME
+   * Types.SQL_TIMESTAMP
+   * Types.INTERVAL_MONTHS
+   * Types.INTERVAL_MILLIS
+   * Types.PRIMITIVE_ARRAY
+   * Types.OBJECT_ARRAY
+   * Types.MAP
+   * Types.MULTISET
+   */
+
+  override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
+    val typeKey = s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}"
+    val valueKey = s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}"
+
+    properties.validateType(typeKey, isOptional = false)
+
+    val typeInfo: TypeInformation[_] =
+      properties.getType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}")
+    typeInfo match {
+      case Types.DECIMAL => properties.validateBigDecimal(valueKey, isOptional = false)
+      case Types.BOOLEAN => properties.validateBoolean(valueKey, isOptional = false)
+      case Types.BYTE => properties.validateByte(valueKey, isOptional = false)
+      case Types.DOUBLE => properties.validateDouble(valueKey, isOptional = false)
+      case Types.FLOAT => properties.validateFloat(valueKey, isOptional = false)
+      case Types.INT => properties.validateInt(valueKey, isOptional = false)
+      case Types.LONG => properties.validateLong(valueKey, isOptional = false)
+      case Types.SHORT => properties.validateShort(valueKey, isOptional = false)
+      case Types.STRING => properties.validateString(valueKey, isOptional = false)
+      case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.")
+    }
+  }
+}
+
+object PrimitiveTypeValidator {
+  val PRIMITIVE_TYPE = "type"
+  val PRIMITIVE_VALUE = "value"
+
+  private val LITERAL_FALSE = "false"
+  private val LITERAL_TRUE = "true"
+
+  /**
+    * Derives the value according to the type and value strings.
+    *
+    * @param keyPrefix the prefix of the primitive type key
+    * @param properties the descriptor properties
+    * @return the derived value
+    */
+  def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = {
+    val typeInfo =
+      properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
+    val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
+    val value = typeInfo match {
+      case Types.DECIMAL => properties.getBigDecimal(valueKey)
+      case Types.BOOLEAN => properties.getBoolean(valueKey)
+      case Types.BYTE => properties.getByte(valueKey)
+      case Types.DOUBLE => properties.getDouble(valueKey)
+      case Types.FLOAT => properties.getFloat(valueKey)
+      case Types.INT => properties.getInt(valueKey)
+      case Types.LONG => properties.getLong(valueKey)
+      case Types.SHORT => properties.getShort(valueKey)
+      case Types.STRING => properties.getString(valueKey)
+      case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.")
+    }
+    value
+  }
+
+  /**
+    * Derives the actually value with the type information and string formatted value.
+    */
+  def deriveTypeAndValueStr[T](typeInfo: TypeInformation[T], valueStr: String): T = {
+    typeInfo match {
+      case Types.DECIMAL => new JBigDecimal(valueStr).asInstanceOf[T]
+      case Types.BOOLEAN => JBoolean.parseBoolean(valueStr).asInstanceOf[T]
+      case Types.BYTE => JByte.parseByte(valueStr).asInstanceOf[T]
+      case Types.DOUBLE => JDouble.parseDouble(valueStr).asInstanceOf[T]
+      case Types.FLOAT => JFloat.parseFloat(valueStr).asInstanceOf[T]
+      case Types.INT => JInt.parseInt(valueStr).asInstanceOf[T]
+      case Types.LONG => JLong.parseLong(valueStr).asInstanceOf[T]
+      case Types.SHORT => JShort.parseShort(valueStr).asInstanceOf[T]
+      case Types.STRING => valueStr.asInstanceOf[T]
+      case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.")
+    }
+  }
+
+/**
+  * Tries to derive the type string from the given string value.
+  * The derive priority for the types are BOOLEAN, INT, DOUBLE, and VARCHAR.
+    *
+    * @param valueStr the string formatted value
+    * @return the type string of the given value
+    */
+  def deriveTypeStrFromValueStr(valueStr: String): String = {
+    if (valueStr.equals(LITERAL_TRUE) || valueStr.equals(LITERAL_FALSE)) {
+      TypeStringUtils.BOOLEAN.key
+    } else {
+      try {
+        valueStr.toInt
+        TypeStringUtils.INT.key
+      } catch {
+        case _: NumberFormatException =>
+          try {
+            valueStr.toDouble
+            TypeStringUtils.DOUBLE.key
+          } catch {
+            case _: NumberFormatException =>
+              TypeStringUtils.STRING.key
+          }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala
new file mode 100644
index 0000000..9e97817
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors.service
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.{ClassTypeValidator, DescriptorProperties, FunctionDescriptor, FunctionValidator, PrimitiveTypeValidator}
+import org.apache.flink.table.functions.UserDefinedFunction
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+/**
+  * Utils that serve [[FunctionDescriptor]].
+  */
+object FunctionService {
+   /**
+     * Generates a user-defined function with the given properties.
+     *
+     * @param properties the descriptor properties that belongs to a [[FunctionDescriptor]]
+     * @param classLoader the class loader to load the function and its parameter's classes
+     * @return the generated user-defined function
+     */
+   def generateUserDefinedFunction(
+      properties: DescriptorProperties,
+      classLoader: ClassLoader): UserDefinedFunction = {
+      new FunctionValidator().validate(properties)
+      generateInstance[UserDefinedFunction]("", properties, classLoader)
+   }
+
+   /**
+     * Recursively generate an instance of a class according the given properties.
+     *
+     * @param keyPrefix the prefix to fetch properties
+     * @param descriptorProps the descriptor properties that contains the class type information
+     * @param classLoader the class loader to load the class
+     * @tparam T type fo the generated instance
+     * @return an instance of the class
+     */
+   def generateInstance[T](
+      keyPrefix: String,
+      descriptorProps: DescriptorProperties,
+      classLoader: ClassLoader): T = {
+      val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}"
+      val constructorProps =
+         descriptorProps.getVariableIndexedProperties(constructorPrefix, List())
+      var i = 0
+      val typeValueList: ArrayBuffer[(Class[_], Any)] = new ArrayBuffer
+      while (i < constructorProps.size()) {
+         if (constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) {
+            val primitiveVal = PrimitiveTypeValidator
+              .derivePrimitiveValue(s"$constructorPrefix.$i.", descriptorProps)
+            typeValueList += ((primitiveVal.getClass, primitiveVal))
+         } else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) {
+            val typeValuePair = (
+              Class.forName(
+                 descriptorProps.getString(constructorProps(i).get(ClassTypeValidator.CLASS))),
+              generateInstance(s"$constructorPrefix.$i.", descriptorProps, classLoader))
+            typeValueList += typeValuePair
+         }
+         i += 1
+      }
+      val clazz = classLoader
+        .loadClass(descriptorProps.getString(s"$keyPrefix${ClassTypeValidator.CLASS}"))
+      val constructor = clazz.getConstructor(typeValueList.map(_._1): _*)
+      if (null == constructor) {
+         throw TableException(s"Cannot find a constructor with parameter types " +
+           s"${typeValueList.map(_._1)} for ${clazz.getName}")
+      }
+      constructor.newInstance(typeValueList.map(_._2.asInstanceOf[AnyRef]): _*).asInstanceOf[T]
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala
new file mode 100644
index 0000000..43710f6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util.{List => JList, Map => JMap, Arrays => JArrays}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class ClassTypeTest extends DescriptorTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingClass(): Unit = {
+    removePropertyAndVerify(descriptors().get(0), ClassTypeValidator.CLASS)
+  }
+
+  override def descriptors(): JList[Descriptor] = {
+    val desc1 = ClassType("class1")
+      .param(BasicTypeInfo.LONG_TYPE_INFO, "1")
+      .param(
+        ClassType("class2")
+          .param(
+            ClassType("class3")
+              .param("StarryNight")
+              .param(
+                ClassType("class4"))))
+      .param(2L)
+
+    val desc2 = ClassType().of("class2")
+
+    JArrays.asList(desc1, desc2)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new ClassTypeValidator()
+  }
+
+  override def properties(): JList[JMap[String, String]] = {
+    val props1 = Map(
+      "class" -> "class1",
+      "constructor.0.type" -> "BIGINT",
+      "constructor.0.value" -> "1",
+      "constructor.1.class" -> "class2",
+      "constructor.1.constructor.0.class" -> "class3",
+      "constructor.1.constructor.0.constructor.0.type" -> "VARCHAR",
+      "constructor.1.constructor.0.constructor.0.value" -> "StarryNight",
+      "constructor.1.constructor.0.constructor.1.class" -> "class4",
+      "constructor.2.type" -> "BIGINT",
+      "constructor.2.value" -> "2"
+    )
+
+    val props2 = Map(
+      "class" -> "class2"
+    )
+
+    JArrays.asList(props1.asJava, props2.asJava)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala
new file mode 100644
index 0000000..3b93ca25
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util.{List => JList, Map => JMap, Arrays => JArrays}
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class FunctionTest extends DescriptorTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingName(): Unit = {
+    removePropertyAndVerify(descriptors().get(0), "name")
+  }
+
+  override def descriptors(): JList[Descriptor] = {
+    val desc1 = FunctionDescriptor("func1")
+      .using(
+        ClassType("another.class")
+          .of("my.class")
+          .param("INT", "1")
+          .param(
+            ClassType()
+              .of("my.class2")
+              .strParam("true")))
+
+    JArrays.asList(desc1)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new FunctionValidator()
+  }
+
+  override def properties(): JList[JMap[String, String]] = {
+    val props1 = Map(
+      "name" -> "func1",
+      "class" -> "my.class",
+      "constructor.0.type" -> "INT",
+      "constructor.0.value" -> "1",
+      "constructor.1.class" -> "my.class2",
+      "constructor.1.constructor.0.type" -> "BOOLEAN",
+      "constructor.1.constructor.0.value" -> "true"
+    )
+    JArrays.asList(props1.asJava)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala
new file mode 100644
index 0000000..f684e2a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util.{Arrays => JArrays, List => JList, Map => JMap}
+import java.math.{BigDecimal => JBigDecimal}
+
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class PrimitiveTypeTest extends DescriptorTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingType(): Unit = {
+    removePropertyAndVerify(descriptors().get(0), PrimitiveTypeValidator.PRIMITIVE_TYPE)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingValue(): Unit = {
+    removePropertyAndVerify(descriptors().get(0), PrimitiveTypeValidator.PRIMITIVE_VALUE)
+  }
+
+  override def descriptors(): JList[Descriptor] = {
+    val bigDecimalDesc = PrimitiveType().of(Types.DECIMAL).value(new JBigDecimal(1))
+    val booleanDesc = PrimitiveType().of(Types.BOOLEAN).value(false)
+    val byteDesc = PrimitiveType().of(Types.BYTE).value(4.asInstanceOf[Byte])
+    val doubleDesc = PrimitiveType().of(Types.DOUBLE).value(7.0)
+    val floatDesc = PrimitiveType().of(Types.FLOAT).value(8f)
+    val intDesc = PrimitiveType().of(Types.INT).value(9)
+    val longDesc = PrimitiveType().of(Types.LONG).value(10L)
+    val shortDesc = PrimitiveType().of(Types.SHORT).value(11.asInstanceOf[Short])
+    val stringDesc = PrimitiveType().of(Types.STRING).value("12")
+
+    JArrays.asList(
+      bigDecimalDesc,
+      booleanDesc,
+      byteDesc,
+      doubleDesc,
+      floatDesc,
+      intDesc,
+      longDesc,
+      shortDesc,
+      stringDesc)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new PrimitiveTypeValidator()
+  }
+
+  override def properties(): JList[JMap[String, String]] = {
+    val bigDecimalProps = Map(
+      "type" -> "DECIMAL",
+      "value" -> "1"
+    )
+    val booleanDesc = Map(
+      "type" -> "BOOLEAN",
+      "value" -> "false"
+    )
+    val byteDesc = Map(
+      "type" -> "TINYINT",
+      "value" -> "4"
+    )
+
+    val doubleDesc = Map(
+      "type" -> "DOUBLE",
+      "value" -> "7.0"
+    )
+    val floatDesc = Map(
+      "type" -> "FLOAT",
+      "value" -> "8.0"
+    )
+    val intProps = Map(
+      "type" -> "INT",
+      "value" -> "9"
+    )
+    val longDesc = Map(
+      "type" -> "BIGINT",
+      "value" -> "10"
+    )
+    val shortDesc = Map(
+      "type" -> "SMALLINT",
+      "value" -> "11"
+    )
+    val stringDesc = Map(
+      "type" -> "VARCHAR",
+      "value" -> "12"
+    )
+    JArrays.asList(
+      bigDecimalProps.asJava,
+      booleanDesc.asJava,
+      byteDesc.asJava,
+      doubleDesc.asJava,
+      floatDesc.asJava,
+      intProps.asJava,
+      longDesc.asJava,
+      shortDesc.asJava,
+      stringDesc.asJava)
+  }
+}