You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/10 12:42:13 UTC
[3/3] flink git commit: [FLINK-8863] [sql-client] Add user-defined
function support in SQL Client
[FLINK-8863] [sql-client] Add user-defined function support in SQL Client
This closes #6090.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8014dade
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8014dade
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8014dade
Branch: refs/heads/master
Commit: 8014dade4ba844786afa85fd517d6aba807dbc81
Parents: 79b38f8
Author: Xingcan Cui <xi...@gmail.com>
Authored: Sun May 27 23:36:25 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Tue Jul 10 13:55:21 2018 +0200
----------------------------------------------------------------------
.../flink/table/client/config/Environment.java | 20 ++
.../client/config/UserDefinedFunction.java | 98 +++++++
.../client/gateway/local/ExecutionContext.java | 44 +++
.../gateway/local/LocalExecutorITCase.java | 122 ++++++--
.../gateway/utils/UserDefinedFunctions.java | 105 +++++++
.../src/test/resources/test-sql-client-udf.yaml | 82 ++++++
.../flink/table/descriptors/ClassType.scala | 123 ++++++++
.../table/descriptors/ClassTypeValidator.scala | 75 +++++
.../descriptors/DescriptorProperties.scala | 282 +++++++++++++++----
.../table/descriptors/FunctionDescriptor.scala | 52 ++++
.../table/descriptors/FunctionValidator.scala | 42 +++
.../table/descriptors/HierarchyDescriptor.scala | 34 +++
.../HierarchyDescriptorValidator.scala | 35 +++
.../flink/table/descriptors/PrimitiveType.scala | 55 ++++
.../descriptors/PrimitiveTypeValidator.scala | 146 ++++++++++
.../descriptors/service/FunctionService.scala | 87 ++++++
.../flink/table/descriptors/ClassTypeTest.scala | 77 +++++
.../flink/table/descriptors/FunctionTest.scala | 65 +++++
.../table/descriptors/PrimitiveTypeTest.scala | 117 ++++++++
19 files changed, 1591 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index 0efb665..c1db4c1 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -46,10 +46,13 @@ public class Environment {
private Deployment deployment;
+ private Map<String, UserDefinedFunction> functions;
+
public Environment() {
this.tables = Collections.emptyMap();
this.execution = new Execution();
this.deployment = new Deployment();
+ this.functions = Collections.emptyMap();
}
public Map<String, TableDescriptor> getTables() {
@@ -76,6 +79,14 @@ public class Environment {
});
}
+ public void setFunctions(List<Map<String, Object>> functions) {
+ this.functions = new HashMap<>(functions.size());
+ functions.forEach(config -> {
+ final UserDefinedFunction f = UserDefinedFunction.create(config);
+ this.functions.put(f.name(), f);
+ });
+ }
+
public void setExecution(Map<String, Object> config) {
this.execution = Execution.create(config);
}
@@ -92,6 +103,10 @@ public class Environment {
return deployment;
}
+ public Map<String, UserDefinedFunction> getFunctions() {
+ return functions;
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
@@ -136,6 +151,11 @@ public class Environment {
tables.putAll(env2.getTables());
mergedEnv.tables = tables;
+ // merge functions
+ final Map<String, UserDefinedFunction> functions = new HashMap<>(env1.getFunctions());
+ mergedEnv.getFunctions().putAll(env2.getFunctions());
+ mergedEnv.functions = functions;
+
// merge execution properties
mergedEnv.execution = Execution.merge(env1.getExecution(), env2.getExecution());
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
new file mode 100644
index 0000000..235f4ea
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.UserDefinedFunction.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UserDefinedFunction extends FunctionDescriptor {
+
+ private static final String FROM = "from";
+
+ private From from;
+
+ private Map<String, String> properties;
+
+ private UserDefinedFunction(String name, From from, Map<String, String> properties) {
+ super(name);
+ this.from = from;
+ this.properties = properties;
+ }
+
+ /**
+ * Gets where the user-defined function should be created from.
+ */
+ public From getFrom() {
+ return from;
+ }
+
+ /**
+ * Creates a UDF descriptor with the given config.
+ */
+ public static UserDefinedFunction create(Map<String, Object> config) {
+ Map<String, String> udfConfig = ConfigUtil.normalizeYaml(config);
+ if (!udfConfig.containsKey(FunctionValidator.FUNCTION_NAME())) {
+ throw new SqlClientException("The 'name' attribute of a function is missing.");
+ }
+
+ final String name = udfConfig.get(FunctionValidator.FUNCTION_NAME());
+ if (name.trim().length() <= 0) {
+ throw new SqlClientException("Invalid function name '" + name + "'.");
+ }
+
+ // the default value is "CLASS"
+ From fromValue = CLASS;
+
+ if (udfConfig.containsKey(FROM)) {
+ final String from = udfConfig.get(FROM);
+ try {
+ fromValue = From.valueOf(from.toUpperCase());
+ } catch (IllegalArgumentException ex) {
+ throw new SqlClientException("Unknown 'from' value '" + from + "'.");
+ }
+ }
+
+ switch (fromValue) {
+ case CLASS:
+ return new UserDefinedFunction(name, fromValue, udfConfig);
+ default:
+ throw new SqlClientException("The from attribute can only be \"class\" now.");
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void addProperties(DescriptorProperties properties) {
+ this.properties.forEach(properties::putString);
+ }
+
+ enum From {
+ CLASS
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index c62faa4..0cd9738 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -42,11 +42,19 @@ import org.apache.flink.table.api.BatchQueryConfig;
import org.apache.flink.table.api.QueryConfig;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.client.config.Deployment;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.TableSourceDescriptor;
+import org.apache.flink.table.descriptors.service.FunctionService;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceFactoryService;
import org.apache.flink.util.FlinkException;
@@ -73,6 +81,7 @@ public class ExecutionContext<T> {
private final List<URL> dependencies;
private final ClassLoader classLoader;
private final Map<String, TableSource<?>> tableSources;
+ private final Map<String, UserDefinedFunction> functions;
private final Configuration flinkConfig;
private final CommandLine commandLine;
private final CustomCommandLine<T> activeCommandLine;
@@ -102,6 +111,16 @@ public class ExecutionContext<T> {
}
});
+ // generate user-defined functions
+ functions = new HashMap<>();
+ mergedEnv.getFunctions().forEach((name, descriptor) -> {
+ DescriptorProperties properties = new DescriptorProperties(true);
+ descriptor.addProperties(properties);
+ functions.put(
+ name,
+ FunctionService.generateUserDefinedFunction(properties, classLoader));
+ });
+
// convert deployment options into command line options that describe a cluster
commandLine = createCommandLine(mergedEnv.getDeployment(), commandLineOptions);
activeCommandLine = findActiveCommandLine(availableCommandLines, commandLine);
@@ -207,6 +226,31 @@ public class ExecutionContext<T> {
// register table sources
tableSources.forEach(tableEnv::registerTableSource);
+
+ // register UDFs
+ if (tableEnv instanceof StreamTableEnvironment) {
+ StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) tableEnv;
+ functions.forEach((k, v) -> {
+ if (v instanceof ScalarFunction) {
+ streamTableEnvironment.registerFunction(k, (ScalarFunction) v);
+ } else if (v instanceof AggregateFunction) {
+ streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
+ } else if (v instanceof TableFunction) {
+ streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
+ }
+ });
+ } else {
+ BatchTableEnvironment batchTableEnvironment = (BatchTableEnvironment) tableEnv;
+ functions.forEach((k, v) -> {
+ if (v instanceof ScalarFunction) {
+ batchTableEnvironment.registerFunction(k, (ScalarFunction) v);
+ } else if (v instanceof AggregateFunction) {
+ batchTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
+ } else if (v instanceof TableFunction) {
+ batchTableEnvironment.registerFunction(k, (TableFunction<?>) v);
+ }
+ });
+ }
}
public QueryConfig getQueryConfig() {
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 9a1eb08..b32353f 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -66,6 +66,7 @@ import static org.junit.Assert.assertTrue;
public class LocalExecutorITCase extends TestLogger {
private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
+ private static final String UDF_ENVIRONMENT_FILE = "test-sql-client-udf.yaml";
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS_PER_TM = 2;
@@ -149,6 +150,68 @@ public class LocalExecutorITCase extends TestLogger {
}
@Test(timeout = 30_000L)
+ public void testScalarUDF() throws Exception {
+ final Executor executor =
+ createDefaultExecutor(UDF_ENVIRONMENT_FILE, clusterClient);
+ final SessionContext session = new SessionContext("test-scalarUDF", new Environment());
+ final ResultDescriptor rd =
+ executor.executeQuery(session, "SELECT scalarUDF(10)");
+ final List<String> actualResults =
+ retrieveChangelogResult(executor, session, rd.getResultId());
+ final List<String> expectedResults = new ArrayList<>();
+ expectedResults.add("(true,15)");
+ TestBaseUtils.compareResultCollections(
+ expectedResults, actualResults, Comparator.naturalOrder());
+ }
+
+ @Test(timeout = 30_000L)
+ public void testAggregateUDF() throws Exception {
+ final Executor executor =
+ createDefaultExecutor(UDF_ENVIRONMENT_FILE, clusterClient);
+ final SessionContext session = new SessionContext("test-aggregateUDF", new Environment());
+ final ResultDescriptor rd =
+ executor.executeQuery(session, "SELECT aggregateUDF(cast(1 as BIGINT))");
+ final List<String> actualResults =
+ retrieveChangelogResult(executor, session, rd.getResultId());
+ final List<String> expectedResults = new ArrayList<>();
+ expectedResults.add("(true,100)");
+ TestBaseUtils.compareResultCollections(
+ expectedResults, actualResults, Comparator.naturalOrder());
+ }
+
+ @Test(timeout = 30_000L)
+ public void testTableUDF() throws Exception {
+ final URL url = getClass().getClassLoader().getResource("test-data.csv");
+ Objects.requireNonNull(url);
+ final Map<String, String> replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_0", url.getPath());
+ final Executor executor =
+ createModifiedExecutor(UDF_ENVIRONMENT_FILE, clusterClient, replaceVars);
+
+ final SessionContext session = new SessionContext("test-aggregateUDF", new Environment());
+ final ResultDescriptor rd =
+ executor.executeQuery(
+ session,
+ "SELECT w, l from TableNumber1, LATERAL TABLE(tableUDF(StringField1)) as T(w, l)");
+ final List<String> actualResults =
+ retrieveChangelogResult(executor, session, rd.getResultId());
+ final List<String> expectedResults = new ArrayList<>();
+ expectedResults.add("(true,Hello,10)");
+ expectedResults.add("(true,World,10)");
+ expectedResults.add("(true,Hello,10)");
+ expectedResults.add("(true,World,10)");
+ expectedResults.add("(true,Hello,10)");
+ expectedResults.add("(true,World,10)");
+ expectedResults.add("(true,Hello,10)");
+ expectedResults.add("(true,World,10)");
+ expectedResults.add("(true,Hello,10)");
+ expectedResults.add("(true,World,10)");
+ expectedResults.add("(true,Hello,10)");
+ expectedResults.add("(true,World!!!!,14)");
+ TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
+ }
+
+ @Test(timeout = 30_000L)
public void testStreamQueryExecutionChangelog() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
@@ -167,20 +230,8 @@ public class LocalExecutorITCase extends TestLogger {
assertFalse(desc.isMaterialized());
- final List<String> actualResults = new ArrayList<>();
-
- while (true) {
- Thread.sleep(50); // slow the processing down
- final TypedResult<List<Tuple2<Boolean, Row>>> result =
- executor.retrieveResultChanges(session, desc.getResultId());
- if (result.getType() == TypedResult.ResultType.PAYLOAD) {
- for (Tuple2<Boolean, Row> change : result.getPayload()) {
- actualResults.add(change.toString());
- }
- } else if (result.getType() == TypedResult.ResultType.EOS) {
- break;
- }
- }
+ final List<String> actualResults =
+ retrieveChangelogResult(executor, session, desc.getResultId());
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("(true,42,Hello World)");
@@ -266,19 +317,32 @@ public class LocalExecutorITCase extends TestLogger {
}
private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> clusterClient) throws Exception {
+ return createDefaultExecutor(DEFAULTS_ENVIRONMENT_FILE, clusterClient);
+ }
+
+ private <T> LocalExecutor createDefaultExecutor(
+ String configFileName, ClusterClient<T>
+ clusterClient) throws Exception {
return new LocalExecutor(
- EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, Collections.singletonMap("$VAR_2", "batch")),
+ EnvironmentFileUtil.parseModified(configFileName, Collections.singletonMap("$VAR_2", "batch")),
Collections.emptyList(),
clusterClient.getFlinkConfiguration(),
new DummyCustomCommandLine<T>(clusterClient));
}
private <T> LocalExecutor createModifiedExecutor(ClusterClient<T> clusterClient, Map<String, String> replaceVars) throws Exception {
+ return createModifiedExecutor(DEFAULTS_ENVIRONMENT_FILE, clusterClient, replaceVars);
+ }
+
+ private <T> LocalExecutor createModifiedExecutor(
+ String configFileName,
+ ClusterClient<T> clusterClient,
+ Map<String, String> replaceVars) throws Exception {
return new LocalExecutor(
- EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
- Collections.emptyList(),
- clusterClient.getFlinkConfiguration(),
- new DummyCustomCommandLine<T>(clusterClient));
+ EnvironmentFileUtil.parseModified(configFileName, replaceVars),
+ Collections.emptyList(),
+ clusterClient.getFlinkConfiguration(),
+ new DummyCustomCommandLine<T>(clusterClient));
}
private List<String> retrieveTableResult(
@@ -304,4 +368,24 @@ public class LocalExecutorITCase extends TestLogger {
return actualResults;
}
+
+ private List<String> retrieveChangelogResult(
+ Executor executor,
+ SessionContext session,
+ String resultID) throws InterruptedException {
+ final List<String> actualResults = new ArrayList<>();
+ while (true) {
+ Thread.sleep(50); // slow the processing down
+ final TypedResult<List<Tuple2<Boolean, Row>>> result =
+ executor.retrieveResultChanges(session, resultID);
+ if (result.getType() == TypedResult.ResultType.PAYLOAD) {
+ for (Tuple2<Boolean, Row> change : result.getPayload()) {
+ actualResults.add(change.toString());
+ }
+ } else if (result.getType() == TypedResult.ResultType.EOS) {
+ break;
+ }
+ }
+ return actualResults;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
new file mode 100644
index 0000000..b93ef66
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+/**
+ * A bunch of UDFs for SQL-Client test.
+ */
+public class UserDefinedFunctions {
+
+ /**
+ * The scalar function for SQL-Client test.
+ */
+ public static class ScalarUDF extends ScalarFunction {
+
+ private int offset;
+
+ public ScalarUDF(Integer offset) {
+ this.offset = offset;
+ }
+
+ public String eval(Integer i) {
+ return String.valueOf(i + offset);
+ }
+ }
+
+ /**
+ * The aggregate function for SQL-Client test.
+ */
+ public static class AggregateUDF extends AggregateFunction<Long, Long> {
+
+ public AggregateUDF(String name, Boolean flag, Integer value) {
+
+ }
+
+ @Override
+ public Long createAccumulator() {
+ return 0L;
+ }
+
+ @Override
+ public Long getValue(Long accumulator) {
+ return 100L;
+ }
+
+ public void accumulate(Long acc, Long value) {
+
+ }
+
+ @Override
+ public TypeInformation<Long> getResultType() {
+ return BasicTypeInfo.LONG_TYPE_INFO;
+ }
+ }
+
+ /**
+ * The table function for SQL-Client test.
+ */
+ public static class TableUDF extends TableFunction<Row> {
+ private long extra = 2L;
+
+ public TableUDF(Long extra) {
+ this.extra = extra;
+ }
+
+ public void eval(String str) {
+ for (String s : str.split(" ")) {
+ Row r = new Row(2);
+ r.setField(0, s);
+ r.setField(1, s.length() + extra);
+ collect(r);
+ }
+ }
+
+ @Override
+ public TypeInformation<Row> getResultType() {
+ return Types.ROW(Types.STRING(), Types.LONG());
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
new file mode 100644
index 0000000..3e16030
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-udf.yaml
@@ -0,0 +1,82 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+#==============================================================================
+# TEST ENVIRONMENT FILE
+# Intended for org.apache.flink.table.client.gateway.local.LocalExecutorITCase.
+#==============================================================================
+
+# this file has variables that can be filled with content by replacing $VAR_XXX
+
+tables:
+ - name: TableNumber1
+ type: source
+ schema:
+ - name: IntegerField1
+ type: INT
+ - name: StringField1
+ type: VARCHAR
+ connector:
+ type: filesystem
+ path: "$VAR_0"
+ format:
+ type: csv
+ fields:
+ - name: IntegerField1
+ type: INT
+ - name: StringField1
+ type: VARCHAR
+ line-delimiter: "\n"
+ comment-prefix: "#"
+
+functions:
+ - name: scalarUDF
+ from: class
+ class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$ScalarUDF
+ constructor:
+ - 5
+
+ - name: aggregateUDF
+ from: class
+ class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$AggregateUDF
+ constructor:
+ - StarryName
+ - false
+ - class: java.lang.Integer
+ constructor:
+ - class: java.lang.String
+ constructor:
+ - type: VARCHAR
+ value: 3
+
+ - name: tableUDF
+ from: class
+ class: org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$TableUDF
+ constructor:
+ - type: LONG
+ value: 5
+
+execution:
+ type: streaming
+ parallelism: 1
+ result-mode: changelog
+
+deployment:
+ response-timeout: 5000
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
new file mode 100644
index 0000000..78ceb3a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassType.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Descriptor for a class type.
+ *
+ * @param className the full name of the class (e.g., java.lang.Integer)
+ */
+class ClassType(var className: Option[String] = None)
+ extends HierarchyDescriptor {
+
+ // the parameter is either a primitive type or a class type
+ private val constructor: ArrayBuffer[Either[PrimitiveType[_], ClassType]] =
+ ArrayBuffer()
+
+ /**
+ * Sets the class name for the descriptor.
+ */
+ def of(name: String): ClassType = {
+ this.className = Option(name)
+ this
+ }
+
+ /**
+ * Adds the given string formatted value as a parameter, the type of which will be automatically
+ * derived (e.g., "true" -> Boolean, "1" -> Integer, "2.0" -> Double and "abc" -> String).
+ *
+ */
+ def strParam(valueStr: String): ClassType = {
+ val typeString = PrimitiveTypeValidator.deriveTypeStrFromValueStr(valueStr)
+ param(typeString, valueStr)
+ }
+
+ /**
+ * Adds the given string formatted value as a parameter, the type of which will be decided by the
+ * given type string (e.g., "DOUBLE", "VARCHAR").
+ */
+ def param(typeStr: String, valueStr: String): ClassType = {
+ param(TypeStringUtils.readTypeInfo(typeStr), valueStr)
+ }
+
+ /**
+ * Adds the give string formatted value as a parameter, the type of which will be defined by the
+ * given type information.
+ */
+ def param[T](typeInfo: TypeInformation[T], valueStr: String): ClassType = {
+ constructor += Left(
+ new PrimitiveType[T]()
+ .of(typeInfo)
+ .value(PrimitiveTypeValidator.deriveTypeAndValueStr(typeInfo, valueStr)))
+ this
+ }
+
+ /**
+ * Adds the give value as a parameter, the type of which will be automatically derived.
+ */
+ def param[T](value: T): ClassType = {
+ constructor += Left(
+ new PrimitiveType[T]()
+ .of(TypeInformation.of(value.getClass.asInstanceOf[Class[T]]))
+ .value(value))
+ this
+ }
+
+ /**
+ * Adds a parameter defined by the given class type descriptor.
+ */
+ def param(field: ClassType): ClassType = {
+ constructor += Right(field)
+ this
+ }
+
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+ addPropertiesWithPrefix("", properties)
+ }
+
+ override private[flink] def addPropertiesWithPrefix(
+ keyPrefix: String,
+ properties: DescriptorProperties): Unit = {
+ className.foreach(properties.putString(s"$keyPrefix${ClassTypeValidator.CLASS}", _))
+ var i = 0
+ while (i < constructor.size) {
+ constructor(i) match {
+ case Left(basicType) =>
+ basicType.addPropertiesWithPrefix(
+ s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.",
+ properties)
+ case Right(classType) =>
+ classType.addPropertiesWithPrefix(
+ s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}.$i.",
+ properties)
+ }
+ i += 1
+ }
+ }
+}
+
+object ClassType {
+ def apply() = new ClassType
+ def apply(className: String) = new ClassType(Option(className))
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
new file mode 100644
index 0000000..ebb48b5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeValidator.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+
+import scala.collection.JavaConversions._
+
+/**
+ * Validator for [[ClassType]].
+ */
+class ClassTypeValidator extends HierarchyDescriptorValidator {
+ override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
+
+ properties.validateString(s"$keyPrefix${ClassTypeValidator.CLASS}", isOptional = false)
+
+ val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}"
+ normalizeConstructorParams(constructorPrefix, properties)
+
+ val constructorProps =
+ properties.getVariableIndexedProperties(constructorPrefix, List())
+ var i = 0
+ val primitiveTypeValidator = new PrimitiveTypeValidator
+ while (i < constructorProps.size()) {
+ if (constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) {
+ primitiveTypeValidator.validateWithPrefix(s"$constructorPrefix.$i.", properties)
+ } else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) {
+ validateWithPrefix(s"$constructorPrefix.$i.", properties)
+ } else {
+ throw ValidationException("A constructor field must contain a 'class' or a 'type' key.")
+ }
+ i += 1
+ }
+ }
+
+ /**
+ * For each constructor parameter (e.g., constructor.0 = abc), we derive its type and replace it
+ * with the normalized form (e.g., constructor.0.type = VARCHAR, constructor.0.value = abc);
+ *
+ * @param constructorPrefix the prefix to get the constructor parameters
+ * @param properties the descriptor properties
+ */
+ def normalizeConstructorParams(
+ constructorPrefix: String,
+ properties: DescriptorProperties): Unit = {
+ val constructorValues = properties.getListProperties(constructorPrefix)
+ constructorValues.foreach(kv => {
+ properties.unsafeRemove(kv._1)
+ val tp = PrimitiveTypeValidator.deriveTypeStrFromValueStr(kv._2)
+ properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_TYPE}", tp)
+ properties.putString(s"${kv._1}.${PrimitiveTypeValidator.PRIMITIVE_VALUE}", kv._2)
+ })
+ }
+}
+
+object ClassTypeValidator {
+ val CLASS = "class"
+ val CONSTRUCTOR = "constructor"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index 4b0b60d..8d410d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -19,7 +19,8 @@
package org.apache.flink.table.descriptors
import java.io.Serializable
-import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, Long => JLong}
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort}
+import java.math.{BigDecimal => JBigDecimal}
import java.util
import java.util.function.{Consumer, Supplier}
import java.util.regex.Pattern
@@ -238,10 +239,25 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
}
/**
+ * Returns a big decimal value under the given key if it exists.
+ */
+ def getOptionalBigDecimal(key: String): Optional[JBigDecimal] = {
+ val value = properties.get(key).map(new JBigDecimal(_))
+ toJava(value)
+ }
+
+ /**
+ * Returns a big decimal value under the given existing key.
+ */
+ def getBigDecimal(key: String): BigDecimal = {
+ getOptionalBigDecimal(key).orElseThrow(exceptionSupplier(key))
+ }
+
+ /**
* Returns a boolean value under the given key if it exists.
*/
def getOptionalBoolean(key: String): Optional[JBoolean] = {
- val value = properties.get(key).map(JBoolean.parseBoolean(_)).map(Boolean.box)
+ val value = properties.get(key).map(JBoolean.parseBoolean).map(Boolean.box)
toJava(value)
}
@@ -253,10 +269,55 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
}
/**
+ * Returns a byte value under the given key if it exists.
+ */
+ def getOptionalByte(key: String): Optional[JByte] = {
+ val value = properties.get(key).map(JByte.parseByte).map(Byte.box)
+ toJava(value)
+ }
+
+ /**
+ * Returns a byte value under the given existing key.
+ */
+ def getByte(key: String): Byte = {
+ getOptionalByte(key).orElseThrow(exceptionSupplier(key))
+ }
+
+ /**
+ * Returns a double value under the given key if it exists.
+ */
+ def getOptionalDouble(key: String): Optional[JDouble] = {
+ val value = properties.get(key).map(JDouble.parseDouble).map(Double.box)
+ toJava(value)
+ }
+
+ /**
+ * Returns a double value under the given key if it exists.
+ */
+ def getDouble(key: String): Double = {
+ getOptionalDouble(key).orElseThrow(exceptionSupplier(key))
+ }
+
+ /**
+ * Returns a float value under the given key if it exists.
+ */
+ def getOptionalFloat(key: String): Optional[JFloat] = {
+ val value = properties.get(key).map(JFloat.parseFloat).map(Float.box)
+ toJava(value)
+ }
+
+ /**
+ * Returns a float value under the given key if it exists.
+ */
+ def getFloat(key: String): Float = {
+ getOptionalFloat(key).orElseThrow(exceptionSupplier(key))
+ }
+
+ /**
* Returns an integer value under the given key if it exists.
*/
def getOptionalInt(key: String): Optional[JInt] = {
- val value = properties.get(key).map(JInt.parseInt(_)).map(Int.box)
+ val value = properties.get(key).map(JInt.parseInt).map(Int.box)
toJava(value)
}
@@ -271,7 +332,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
* Returns a long value under the given key if it exists.
*/
def getOptionalLong(key: String): Optional[JLong] = {
- val value = properties.get(key).map(JLong.parseLong(_)).map(Long.box)
+ val value = properties.get(key).map(JLong.parseLong).map(Long.box)
toJava(value)
}
@@ -283,18 +344,18 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
}
/**
- * Returns a double value under the given key if it exists.
+ * Returns a short value under the given key if it exists.
*/
- def getOptionalDouble(key: String): Optional[JDouble] = {
- val value = properties.get(key).map(JDouble.parseDouble(_)).map(Double.box)
+ def getOptionalShort(key: String): Optional[JShort] = {
+ val value = properties.get(key).map(JShort.parseShort).map(Short.box)
toJava(value)
}
/**
- * Returns a double value under the given key if it exists.
+ * Returns a short value under the given existing key.
*/
- def getDouble(key: String): Double = {
- getOptionalDouble(key).orElseThrow(exceptionSupplier(key))
+ def getShort(key: String): Short = {
+ getOptionalShort(key).orElseThrow(exceptionSupplier(key))
}
/**
@@ -470,6 +531,16 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
}
/**
+ * Returns all properties under a group of array formed keys.
+ *
+ * E.g. constructor -> returns all constructor.# properties.
+ */
+ def getListProperties(key: String): JMap[String, String] = {
+ val escapedKey = Pattern.quote(key)
+ properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+")).asJava
+ }
+
+ /**
* Returns all properties under a given key that contains an index in between.
*
* E.g. rowtime.0.name -> returns all rowtime.#.name properties
@@ -566,24 +637,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
min: Int, // inclusive
max: Int) // inclusive
: Unit = {
-
- if (!properties.contains(key)) {
- if (!isOptional) {
- throw new ValidationException(s"Could not find required property '$key'.")
- }
- } else {
- try {
- val value = Integer.parseInt(properties(key))
- if (value < min || value > max) {
- throw new ValidationException(s"Property '$key' must be an integer value between $min " +
- s"and $max but was: ${properties(key)}")
- }
- } catch {
- case _: NumberFormatException =>
- throw new ValidationException(
- s"Property '$key' must be an integer value but was: ${properties(key)}")
- }
- }
+ validateComparable(key, isOptional, new Integer(min), new Integer(max), Integer.valueOf)
}
/**
@@ -616,24 +670,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
min: Long, // inclusive
max: Long) // inclusive
: Unit = {
-
- if (!properties.contains(key)) {
- if (!isOptional) {
- throw new ValidationException(s"Could not find required property '$key'.")
- }
- } else {
- try {
- val value = JLong.parseLong(properties(key))
- if (value < min || value > max) {
- throw new ValidationException(s"Property '$key' must be a long value between $min " +
- s"and $max but was: ${properties(key)}")
- }
- } catch {
- case _: NumberFormatException =>
- throw new ValidationException(
- s"Property '$key' must be a long value but was: ${properties(key)}")
- }
- }
+ validateComparable(key, isOptional, new JLong(min), new JLong(max), JLong.valueOf)
}
/**
@@ -699,22 +736,165 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
min: Double, // inclusive
max: Double) // inclusive
: Unit = {
+ validateComparable(key, isOptional, new JDouble(min), new JDouble(max), JDouble.valueOf)
+ }
+ /**
+ * Validates a big decimal property.
+ */
+ def validateBigDecimal(
+ key: String,
+ isOptional: Boolean): Unit = {
+
+ if (!properties.contains(key)) {
+ if (!isOptional) {
+ throw new ValidationException(s"Could not find required property '$key'.")
+ }
+ } else {
+ try {
+ new JBigDecimal(properties(key))
+ } catch {
+ case _: NumberFormatException =>
+ throw new ValidationException(
+ s"Property '$key' must be a big decimal value but was: ${properties(key)}")
+ }
+ }
+ }
+
+ /**
+ * Validates a big decimal property. The boundaries are inclusive.
+ */
+ def validateBigDecimal(
+ key: String,
+ isOptional: Boolean,
+ min: BigDecimal, // inclusive
+ max: BigDecimal) // inclusive
+ : Unit = {
+ validateComparable(
+ key,
+ isOptional,
+ min.bigDecimal,
+ max.bigDecimal,
+ (value: String) => new JBigDecimal(value))
+ }
+
+ /**
+ * Validates a byte property.
+ */
+ def validateByte(
+ key: String,
+ isOptional: Boolean): Unit = validateDouble(key, isOptional, Byte.MinValue, Byte.MaxValue)
+
+ /**
+ * Validates a byte property. The boundaries are inclusive.
+ */
+ def validateByte(
+ key: String,
+ isOptional: Boolean,
+ min: Byte) // inclusive
+ : Unit = {
+ validateByte(key, isOptional, min, Byte.MaxValue)
+ }
+
+ /**
+ * Validates a byte property. The boundaries are inclusive.
+ */
+ def validateByte(
+ key: String,
+ isOptional: Boolean,
+ min: Byte, // inclusive
+ max: Byte) // inclusive
+ : Unit = {
+ validateComparable(key, isOptional, new JByte(min), new JByte(max), JByte.valueOf)
+ }
+
+ /**
+ * Validates a float property.
+ */
+ def validateFloat(
+ key: String,
+ isOptional: Boolean): Unit = validateFloat(key, isOptional, Float.MinValue, Float.MaxValue)
+
+ /**
+ * Validates a float property. The boundaries are inclusive.
+ */
+ def validateFloat(
+ key: String,
+ isOptional: Boolean,
+ min: Float) // inclusive
+ : Unit = {
+ validateFloat(key, isOptional, min, Float.MaxValue)
+ }
+
+ /**
+ * Validates a float property. The boundaries are inclusive.
+ */
+ def validateFloat(
+ key: String,
+ isOptional: Boolean,
+ min: Float, // inclusive
+ max: Float) // inclusive
+ : Unit = {
+ validateComparable(key, isOptional, new JFloat(min), new JFloat(max), JFloat.valueOf)
+ }
+
+ /**
+ * Validates a short property.
+ */
+ def validateShort(
+ key: String,
+ isOptional: Boolean): Unit = validateFloat(key, isOptional, Short.MinValue, Short.MaxValue)
+
+ /**
+ * Validates a short property. The boundaries are inclusive.
+ */
+ def validateFloat(
+ key: String,
+ isOptional: Boolean,
+ min: Short) // inclusive
+ : Unit = {
+ validateShort(key, isOptional, min, Short.MaxValue)
+ }
+
+ /**
+ * Validates a float property. The boundaries are inclusive.
+ */
+ def validateShort(
+ key: String,
+ isOptional: Boolean,
+ min: Short, // inclusive
+ max: Short) // inclusive
+ : Unit = {
+ validateComparable(key, isOptional, new JShort(min), new JShort(max), JShort.valueOf)
+ }
+
+ /**
+ * Validates a property by first parsing the string value to a comparable object.
+ * The boundaries are inclusive.
+ */
+ private def validateComparable[T <: Comparable[T]](
+ key: String,
+ isOptional: Boolean,
+ min: T,
+ max: T,
+ parseFunction: String => T)
+ : Unit = {
if (!properties.contains(key)) {
if (!isOptional) {
throw new ValidationException(s"Could not find required property '$key'.")
}
} else {
try {
- val value = JDouble.parseDouble(properties(key))
- if (value < min || value > max) {
- throw new ValidationException(s"Property '$key' must be a double value between $min " +
- s"and $max but was: ${properties(key)}")
+ val value = parseFunction(properties(key))
+
+ if (value.compareTo(min) < 0 || value.compareTo(max) > 0) {
+ throw new ValidationException(s"Property '$key' must be a ${min.getClass.getSimpleName}" +
+ s" value between $min and $max but was: ${properties(key)}")
}
} catch {
case _: NumberFormatException =>
throw new ValidationException(
- s"Property '$key' must be an double value but was: ${properties(key)}")
+ s"Property '$key' must be a byte value but was: ${properties(key)}")
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
new file mode 100644
index 0000000..f4c8363
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+ * Descriptor for describing a function that can be instantiated from somewhere (e.g., a class).
+ *
+ * @param name name of the function
+ */
+class FunctionDescriptor(var name: String) extends Descriptor {
+
+ var classDescriptor: Option[ClassType] = None
+
+ /**
+ * Uses the class provided by the descriptor to instantiate the function.
+ */
+ def using(classDescriptor: ClassType): FunctionDescriptor = {
+ this.classDescriptor = Option(classDescriptor)
+ this
+ }
+
+ def getDescriptorProperties: DescriptorProperties = {
+ val descriptorProperties = new DescriptorProperties()
+ addProperties(descriptorProperties)
+ descriptorProperties
+ }
+
+ override def addProperties(properties: DescriptorProperties): Unit = {
+ properties.putString(FunctionValidator.FUNCTION_NAME, name)
+ classDescriptor.foreach(_.addProperties(properties))
+ }
+}
+
+object FunctionDescriptor {
+ def apply(name: String): FunctionDescriptor = new FunctionDescriptor(name)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
new file mode 100644
index 0000000..f36b9c2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionValidator.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.functions.UserDefinedFunction
+
+import scala.collection.JavaConverters._
+
+/**
+ * Validator for [[FunctionDescriptor]].
+ */
+class FunctionValidator extends DescriptorValidator {
+
+ override def validate(properties: DescriptorProperties): Unit = {
+ properties.validateString(FunctionValidator.FUNCTION_NAME, isOptional = false, 1)
+ new ClassTypeValidator().validate(properties)
+ }
+}
+
+object FunctionValidator {
+
+ val FUNCTION_NAME = "name"
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
new file mode 100644
index 0000000..f97c807
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+ * A descriptor that may exist in an arbitrary level (be recursively included by other
+ * descriptors).
+ */
+abstract class HierarchyDescriptor extends Descriptor {
+
+ /**
+ * Internal method for properties conversion. All the property keys will be prefixed according
+ * to the level.
+ */
+ private[flink] def addPropertiesWithPrefix(
+ keyPrefix: String, properties: DescriptorProperties): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
new file mode 100644
index 0000000..73dd1f9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptorValidator.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+ * Validator for a [[HierarchyDescriptor]].
+ */
+trait HierarchyDescriptorValidator extends DescriptorValidator{
+
+ def validate(properties: DescriptorProperties): Unit = {
+ validateWithPrefix("", properties)
+ }
+
+ /**
+ * Performs validation with a prefix for the keys.
+ */
+ def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala
new file mode 100644
index 0000000..9e5fcab
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveType.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+ * Descriptor for a primitive type. Use internally only.
+ */
+class PrimitiveType[T] extends HierarchyDescriptor {
+
+ var typeInformation: TypeInformation[T] = _
+ var value: T = _
+
+ def of(basicType: TypeInformation[T]): PrimitiveType[T] = {
+ typeInformation = basicType
+ this
+ }
+
+ def value(value: T): PrimitiveType[T] = {
+ this.value = value
+ this
+ }
+
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+ addPropertiesWithPrefix("", properties)
+ }
+
+ override private[flink] def addPropertiesWithPrefix(
+ keyPrefix: String, properties: DescriptorProperties): Unit = {
+ properties.putString(keyPrefix + "type", TypeStringUtils.writeTypeInfo(typeInformation))
+ properties.putString(keyPrefix + "value", value.toString)
+ }
+}
+
+object PrimitiveType {
+ def apply[T]() = new PrimitiveType[T]()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
new file mode 100644
index 0000000..9b2f776
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort}
+import java.math.{BigDecimal => JBigDecimal}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableException, Types}
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+ * Validator for [[PrimitiveType]].
+ */
+class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
+
+ /*
+ * TODO The following types need to be supported next.
+ * Types.SQL_DATE
+ * Types.SQL_TIME
+ * Types.SQL_TIMESTAMP
+ * Types.INTERVAL_MONTHS
+ * Types.INTERVAL_MILLIS
+ * Types.PRIMITIVE_ARRAY
+ * Types.OBJECT_ARRAY
+ * Types.MAP
+ * Types.MULTISET
+ */
+
+ override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
+ val typeKey = s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}"
+ val valueKey = s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}"
+
+ properties.validateType(typeKey, isOptional = false)
+
+ val typeInfo: TypeInformation[_] =
+ properties.getType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}")
+ typeInfo match {
+ case Types.DECIMAL => properties.validateBigDecimal(valueKey, isOptional = false)
+ case Types.BOOLEAN => properties.validateBoolean(valueKey, isOptional = false)
+ case Types.BYTE => properties.validateByte(valueKey, isOptional = false)
+ case Types.DOUBLE => properties.validateDouble(valueKey, isOptional = false)
+ case Types.FLOAT => properties.validateFloat(valueKey, isOptional = false)
+ case Types.INT => properties.validateInt(valueKey, isOptional = false)
+ case Types.LONG => properties.validateLong(valueKey, isOptional = false)
+ case Types.SHORT => properties.validateShort(valueKey, isOptional = false)
+ case Types.STRING => properties.validateString(valueKey, isOptional = false)
+ case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.")
+ }
+ }
+}
+
+object PrimitiveTypeValidator {
+ val PRIMITIVE_TYPE = "type"
+ val PRIMITIVE_VALUE = "value"
+
+ private val LITERAL_FALSE = "false"
+ private val LITERAL_TRUE = "true"
+
+ /**
+ * Derives the value according to the type and value strings.
+ *
+ * @param keyPrefix the prefix of the primitive type key
+ * @param properties the descriptor properties
+ * @return the derived value
+ */
+ def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = {
+ val typeInfo =
+ properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
+ val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
+ val value = typeInfo match {
+ case Types.DECIMAL => properties.getBigDecimal(valueKey)
+ case Types.BOOLEAN => properties.getBoolean(valueKey)
+ case Types.BYTE => properties.getByte(valueKey)
+ case Types.DOUBLE => properties.getDouble(valueKey)
+ case Types.FLOAT => properties.getFloat(valueKey)
+ case Types.INT => properties.getInt(valueKey)
+ case Types.LONG => properties.getLong(valueKey)
+ case Types.SHORT => properties.getShort(valueKey)
+ case Types.STRING => properties.getString(valueKey)
+ case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.")
+ }
+ value
+ }
+
+ /**
+ * Derives the actually value with the type information and string formatted value.
+ */
+ def deriveTypeAndValueStr[T](typeInfo: TypeInformation[T], valueStr: String): T = {
+ typeInfo match {
+ case Types.DECIMAL => new JBigDecimal(valueStr).asInstanceOf[T]
+ case Types.BOOLEAN => JBoolean.parseBoolean(valueStr).asInstanceOf[T]
+ case Types.BYTE => JByte.parseByte(valueStr).asInstanceOf[T]
+ case Types.DOUBLE => JDouble.parseDouble(valueStr).asInstanceOf[T]
+ case Types.FLOAT => JFloat.parseFloat(valueStr).asInstanceOf[T]
+ case Types.INT => JInt.parseInt(valueStr).asInstanceOf[T]
+ case Types.LONG => JLong.parseLong(valueStr).asInstanceOf[T]
+ case Types.SHORT => JShort.parseShort(valueStr).asInstanceOf[T]
+ case Types.STRING => valueStr.asInstanceOf[T]
+ case _ => throw TableException(s"Unsupported type ${typeInfo.getTypeClass}.")
+ }
+ }
+
+/**
+ * Tries to derive the type string from the given string value.
+ * The derive priority for the types are BOOLEAN, INT, DOUBLE, and VARCHAR.
+ *
+ * @param valueStr the string formatted value
+ * @return the type string of the given value
+ */
+ def deriveTypeStrFromValueStr(valueStr: String): String = {
+ if (valueStr.equals(LITERAL_TRUE) || valueStr.equals(LITERAL_FALSE)) {
+ TypeStringUtils.BOOLEAN.key
+ } else {
+ try {
+ valueStr.toInt
+ TypeStringUtils.INT.key
+ } catch {
+ case _: NumberFormatException =>
+ try {
+ valueStr.toDouble
+ TypeStringUtils.DOUBLE.key
+ } catch {
+ case _: NumberFormatException =>
+ TypeStringUtils.STRING.key
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala
new file mode 100644
index 0000000..9e97817
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/service/FunctionService.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors.service
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.{ClassTypeValidator, DescriptorProperties, FunctionDescriptor, FunctionValidator, PrimitiveTypeValidator}
+import org.apache.flink.table.functions.UserDefinedFunction
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+/**
+ * Utils that serve [[FunctionDescriptor]].
+ */
+object FunctionService {
+ /**
+ * Generates a user-defined function with the given properties.
+ *
+ * @param properties the descriptor properties that belongs to a [[FunctionDescriptor]]
+ * @param classLoader the class loader to load the function and its parameter's classes
+ * @return the generated user-defined function
+ */
+ def generateUserDefinedFunction(
+ properties: DescriptorProperties,
+ classLoader: ClassLoader): UserDefinedFunction = {
+ new FunctionValidator().validate(properties)
+ generateInstance[UserDefinedFunction]("", properties, classLoader)
+ }
+
+ /**
+ * Recursively generate an instance of a class according the given properties.
+ *
+ * @param keyPrefix the prefix to fetch properties
+ * @param descriptorProps the descriptor properties that contains the class type information
+ * @param classLoader the class loader to load the class
+ * @tparam T type fo the generated instance
+ * @return an instance of the class
+ */
+ def generateInstance[T](
+ keyPrefix: String,
+ descriptorProps: DescriptorProperties,
+ classLoader: ClassLoader): T = {
+ val constructorPrefix = s"$keyPrefix${ClassTypeValidator.CONSTRUCTOR}"
+ val constructorProps =
+ descriptorProps.getVariableIndexedProperties(constructorPrefix, List())
+ var i = 0
+ val typeValueList: ArrayBuffer[(Class[_], Any)] = new ArrayBuffer
+ while (i < constructorProps.size()) {
+ if (constructorProps(i).containsKey(PrimitiveTypeValidator.PRIMITIVE_TYPE)) {
+ val primitiveVal = PrimitiveTypeValidator
+ .derivePrimitiveValue(s"$constructorPrefix.$i.", descriptorProps)
+ typeValueList += ((primitiveVal.getClass, primitiveVal))
+ } else if (constructorProps(i).containsKey(ClassTypeValidator.CLASS)) {
+ val typeValuePair = (
+ Class.forName(
+ descriptorProps.getString(constructorProps(i).get(ClassTypeValidator.CLASS))),
+ generateInstance(s"$constructorPrefix.$i.", descriptorProps, classLoader))
+ typeValueList += typeValuePair
+ }
+ i += 1
+ }
+ val clazz = classLoader
+ .loadClass(descriptorProps.getString(s"$keyPrefix${ClassTypeValidator.CLASS}"))
+ val constructor = clazz.getConstructor(typeValueList.map(_._1): _*)
+ if (null == constructor) {
+ throw TableException(s"Cannot find a constructor with parameter types " +
+ s"${typeValueList.map(_._1)} for ${clazz.getName}")
+ }
+ constructor.newInstance(typeValueList.map(_._2.asInstanceOf[AnyRef]): _*).asInstanceOf[T]
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala
new file mode 100644
index 0000000..43710f6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ClassTypeTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util.{List => JList, Map => JMap, Arrays => JArrays}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class ClassTypeTest extends DescriptorTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testMissingClass(): Unit = {
+ removePropertyAndVerify(descriptors().get(0), ClassTypeValidator.CLASS)
+ }
+
+ override def descriptors(): JList[Descriptor] = {
+ val desc1 = ClassType("class1")
+ .param(BasicTypeInfo.LONG_TYPE_INFO, "1")
+ .param(
+ ClassType("class2")
+ .param(
+ ClassType("class3")
+ .param("StarryNight")
+ .param(
+ ClassType("class4"))))
+ .param(2L)
+
+ val desc2 = ClassType().of("class2")
+
+ JArrays.asList(desc1, desc2)
+ }
+
+ override def validator(): DescriptorValidator = {
+ new ClassTypeValidator()
+ }
+
+ override def properties(): JList[JMap[String, String]] = {
+ val props1 = Map(
+ "class" -> "class1",
+ "constructor.0.type" -> "BIGINT",
+ "constructor.0.value" -> "1",
+ "constructor.1.class" -> "class2",
+ "constructor.1.constructor.0.class" -> "class3",
+ "constructor.1.constructor.0.constructor.0.type" -> "VARCHAR",
+ "constructor.1.constructor.0.constructor.0.value" -> "StarryNight",
+ "constructor.1.constructor.0.constructor.1.class" -> "class4",
+ "constructor.2.type" -> "BIGINT",
+ "constructor.2.value" -> "2"
+ )
+
+ val props2 = Map(
+ "class" -> "class2"
+ )
+
+ JArrays.asList(props1.asJava, props2.asJava)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala
new file mode 100644
index 0000000..3b93ca25
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FunctionTest.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util.{List => JList, Map => JMap, Arrays => JArrays}
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class FunctionTest extends DescriptorTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testMissingName(): Unit = {
+ removePropertyAndVerify(descriptors().get(0), "name")
+ }
+
+ override def descriptors(): JList[Descriptor] = {
+ val desc1 = FunctionDescriptor("func1")
+ .using(
+ ClassType("another.class")
+ .of("my.class")
+ .param("INT", "1")
+ .param(
+ ClassType()
+ .of("my.class2")
+ .strParam("true")))
+
+ JArrays.asList(desc1)
+ }
+
+ override def validator(): DescriptorValidator = {
+ new FunctionValidator()
+ }
+
+ override def properties(): JList[JMap[String, String]] = {
+ val props1 = Map(
+ "name" -> "func1",
+ "class" -> "my.class",
+ "constructor.0.type" -> "INT",
+ "constructor.0.value" -> "1",
+ "constructor.1.class" -> "my.class2",
+ "constructor.1.constructor.0.type" -> "BOOLEAN",
+ "constructor.1.constructor.0.value" -> "true"
+ )
+ JArrays.asList(props1.asJava)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8014dade/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala
new file mode 100644
index 0000000..f684e2a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/PrimitiveTypeTest.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util.{Arrays => JArrays, List => JList, Map => JMap}
+import java.math.{BigDecimal => JBigDecimal}
+
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class PrimitiveTypeTest extends DescriptorTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testMissingType(): Unit = {
+ removePropertyAndVerify(descriptors().get(0), PrimitiveTypeValidator.PRIMITIVE_TYPE)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testMissingValue(): Unit = {
+ removePropertyAndVerify(descriptors().get(0), PrimitiveTypeValidator.PRIMITIVE_VALUE)
+ }
+
+ override def descriptors(): JList[Descriptor] = {
+ val bigDecimalDesc = PrimitiveType().of(Types.DECIMAL).value(new JBigDecimal(1))
+ val booleanDesc = PrimitiveType().of(Types.BOOLEAN).value(false)
+ val byteDesc = PrimitiveType().of(Types.BYTE).value(4.asInstanceOf[Byte])
+ val doubleDesc = PrimitiveType().of(Types.DOUBLE).value(7.0)
+ val floatDesc = PrimitiveType().of(Types.FLOAT).value(8f)
+ val intDesc = PrimitiveType().of(Types.INT).value(9)
+ val longDesc = PrimitiveType().of(Types.LONG).value(10L)
+ val shortDesc = PrimitiveType().of(Types.SHORT).value(11.asInstanceOf[Short])
+ val stringDesc = PrimitiveType().of(Types.STRING).value("12")
+
+ JArrays.asList(
+ bigDecimalDesc,
+ booleanDesc,
+ byteDesc,
+ doubleDesc,
+ floatDesc,
+ intDesc,
+ longDesc,
+ shortDesc,
+ stringDesc)
+ }
+
+ override def validator(): DescriptorValidator = {
+ new PrimitiveTypeValidator()
+ }
+
+ override def properties(): JList[JMap[String, String]] = {
+ val bigDecimalProps = Map(
+ "type" -> "DECIMAL",
+ "value" -> "1"
+ )
+ val booleanDesc = Map(
+ "type" -> "BOOLEAN",
+ "value" -> "false"
+ )
+ val byteDesc = Map(
+ "type" -> "TINYINT",
+ "value" -> "4"
+ )
+
+ val doubleDesc = Map(
+ "type" -> "DOUBLE",
+ "value" -> "7.0"
+ )
+ val floatDesc = Map(
+ "type" -> "FLOAT",
+ "value" -> "8.0"
+ )
+ val intProps = Map(
+ "type" -> "INT",
+ "value" -> "9"
+ )
+ val longDesc = Map(
+ "type" -> "BIGINT",
+ "value" -> "10"
+ )
+ val shortDesc = Map(
+ "type" -> "SMALLINT",
+ "value" -> "11"
+ )
+ val stringDesc = Map(
+ "type" -> "VARCHAR",
+ "value" -> "12"
+ )
+ JArrays.asList(
+ bigDecimalProps.asJava,
+ booleanDesc.asJava,
+ byteDesc.asJava,
+ doubleDesc.asJava,
+ floatDesc.asJava,
+ intProps.asJava,
+ longDesc.asJava,
+ shortDesc.asJava,
+ stringDesc.asJava)
+ }
+}