You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by xccui <gi...@git.apache.org> on 2018/05/28 17:51:22 UTC
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
GitHub user xccui opened a pull request:
https://github.com/apache/flink/pull/6090
[FLINK-8863] [SQL] Add user-defined function support in SQL Client
## What is the purpose of the change
This PR aims to add user-defined function (ScalarFunction, TableFunction and AggregateFunction) support to the SQL Client.
## Brief change log
- Introduce a new `HierarchyDescriptor` and its corresponding validator `HierarchyDescriptorValidator`, which allow constructing descriptors hierarchically.
- Add a `PrimitiveTypeDescriptor` to describe a primitive type value and a `ClassTypeDescriptor` to describe a class type value. A `ClassTypeDescriptor` contains a `constructor` field, which is composed of a list of `PrimitiveTypeDescriptor` or `ClassTypeDescriptor`.
- Add a `UDFDescriptor` and its base class `FunctionDescriptor` to describe a `UserDefinedFunction`. Given a `DescriptorProperties`, a `UserDefinedFunction` can be instantiated with the `FunctionValidator.generateUserDefinedFunction()` method.
- Add related tests for the new components.
## Verifying this change
The change can be verified with the test cases added in `LocalExecutorITCase.java`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (*the doc has not been finished yet*)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/xccui/flink FLINK-8863-udf
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6090.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6090
----
commit c6c7d63f96eda01e24735271859ea8528e229021
Author: Xingcan Cui <xi...@...>
Date: 2018-05-27T15:36:25Z
[FLINK-8863] Add user-defined function support in SQL Client
----
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194463649
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE
+import scala.collection.JavaConversions._
+
+/**
+ * Validator for [[PrimitiveTypeDescriptor]].
+ */
+class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
+ override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
+ properties
+ .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", isOptional = false)
+ properties
+ .validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", isOptional = false, 1)
+ }
+}
+
+object PrimitiveTypeValidator {
+ val PRIMITIVE_TYPE = "type"
+ val PRIMITIVE_VALUE = "value"
+
+ def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = {
+ val typeInfo =
+ properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
+ val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
+ val value = typeInfo match {
+ case basicType: BasicTypeInfo[_] =>
+ basicType match {
+ case BasicTypeInfo.INT_TYPE_INFO =>
+ properties.getInt(valueKey)
+ case BasicTypeInfo.LONG_TYPE_INFO =>
+ properties.getLong(valueKey)
+ case BasicTypeInfo.DOUBLE_TYPE_INFO =>
+ properties.getDouble(valueKey)
+ case BasicTypeInfo.STRING_TYPE_INFO =>
+ properties.getString(valueKey)
+ case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
+ properties.getBoolean(valueKey)
+ //TODO add more types
--- End diff --
Would be great if your could add support for all Java basic types. We can do arrays in a follow-up issue or on user request.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194451906
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
@@ -187,6 +206,7 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine
private final StreamExecutionEnvironment streamExecEnv;
private final TableEnvironment tableEnv;
+ @SuppressWarnings("unchecked")
--- End diff --
Remove this?
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r198426083
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
+
+ private static final String FROM = "from";
+
+ private From from;
+
+ private UDFDescriptor(String name, From from) {
+ super(name);
+ this.from = from;
+ }
+
+ public From getFrom() {
+ return from;
+ }
+
+ /**
+ * Create a UDF descriptor with the given config.
+ */
+ public static UDFDescriptor create(Map<String, Object> config) {
+ if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
+ throw new SqlClientException("The 'name' attribute of a function is missing.");
+ }
+
+ final Object name = config.get(FunctionValidator.FUNCTION_NAME());
+ if (!(name instanceof String) || ((String) name).length() <= 0) {
+ throw new SqlClientException("Invalid function name '" + name + "'.");
+ }
--- End diff --
I think it is ok to loose the type information from YAML. The decision about a value being boolean or string should depend on the pure-string property. We will also support JSON and other translation paths in the future. So the parsing logic should be centralized. Let's assume `true/false` strings as boolean. If it should be interpreted as string, a user can specify `type: VARCHAR`.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r198430280
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE
+import scala.collection.JavaConversions._
+
+/**
+ * Validator for [[PrimitiveTypeDescriptor]].
+ */
+class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
+ override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
+ properties
+ .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", isOptional = false)
+ properties
+ .validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", isOptional = false, 1)
+ }
+}
+
+object PrimitiveTypeValidator {
+ val PRIMITIVE_TYPE = "type"
+ val PRIMITIVE_VALUE = "value"
+
+ def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = {
+ val typeInfo =
+ properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
+ val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
+ val value = typeInfo match {
+ case basicType: BasicTypeInfo[_] =>
+ basicType match {
+ case BasicTypeInfo.INT_TYPE_INFO =>
+ properties.getInt(valueKey)
+ case BasicTypeInfo.LONG_TYPE_INFO =>
+ properties.getLong(valueKey)
+ case BasicTypeInfo.DOUBLE_TYPE_INFO =>
+ properties.getDouble(valueKey)
+ case BasicTypeInfo.STRING_TYPE_INFO =>
+ properties.getString(valueKey)
+ case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
+ properties.getBoolean(valueKey)
+ //TODO add more types
--- End diff --
Thanks :)
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r196995942
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
@@ -187,6 +206,7 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine
private final StreamExecutionEnvironment streamExecEnv;
private final TableEnvironment tableEnv;
+ @SuppressWarnings("unchecked")
--- End diff --
The `AggregateFunction` and `TableFunction` take generic type parameters. Thus when do conversions (e.g., `streamTableEnvironment.registerFunction(k, (AggregateFunction) udf);`) the Java compiler complains about that.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r198427940
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
@@ -187,6 +206,7 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine
private final StreamExecutionEnvironment streamExecEnv;
private final TableEnvironment tableEnv;
+ @SuppressWarnings("unchecked")
--- End diff --
Weird. If I remove the annotation and add wildcards, my IDE does not show a warning:
`streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);`
---
[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6090
@xccui no problem. great to hear that you graduated :)
The feature freeze is in approximately 3 weeks, we should be done by then. Let me know if you need support.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r198429917
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+ * Descriptor for a primitive type.
+ */
+class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
+
+ // TODO not sure if we should the BasicTypeInfo here
+ var typeInformation: BasicTypeInfo[T] = _
+ var value: T = _
+
+ def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
--- End diff --
We should only support very simple type inference logic for values without specified types. E.g.:
- "1" -> always int
- "1.0" -> always double
- "true/false" -> always boolean
- string otherwise
This covers 80% of all parameters. For everything else, declare a type and we will call `Byte.valueOf()` etc.
---
[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6090
@xccui perfect. thank you very much!
---
[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6090
Hi @xccui, sorry for being pushy but when do you think you can update this PR? The feature freeze is in approx. 1 week and I need to coordinate my work/reviewing efforts. Let me know if you want me to take over.
---
[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:
https://github.com/apache/flink/pull/6090
Hi @twalthr, please give me one more day. I will commit the changes tomorrow. 😄
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r198485307
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+ * Descriptor for a primitive type.
+ */
+class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
+
+ // TODO not sure if we should the BasicTypeInfo here
+ var typeInformation: BasicTypeInfo[T] = _
+ var value: T = _
+
+ def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
--- End diff --
Sounds reasonable. Thanks.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6090
---
[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6090
Thank you very much @xccui! I will have a final pass over the chances and merge this. I think we this PR we cover the most important cases. I will open follow-up issues for docs etc. We can add those things after the feature freeze.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194426814
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
--- End diff --
I would just call this `Function`. Because it is similar to the existing `Source` class.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194453136
--- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+/**
+ * A bunch of UDFs for SQL-Client test.
+ */
+public class UserDefinedFunctions {
--- End diff --
`flink-table` already provides testing functions. Maybe you can add some there if needed.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194459557
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+ * A descriptor that may exist in different levels (be included by other descriptors).
+ */
+abstract class HierarchyDescriptor extends Descriptor {
+
+ private[flink] def addPropertiesWithPrefix(
--- End diff --
Add an internal JavaDoc notice like in `addProperties` because this method will be public in Java API.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r196995241
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
@@ -101,6 +110,16 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo
}
});
+ // generate user-defined functions
+ functions = new HashMap<>();
+ mergedEnv.getFunctions().forEach((name, descriptor) -> {
+ DescriptorProperties properties = new DescriptorProperties(true);
+ descriptor.addProperties(properties);
+ functions.put(
+ name,
+ FunctionValidator.generateUserDefinedFunction(properties, classLoader));
--- End diff --
That's a good idea.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194461789
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+ * Descriptor for a primitive type.
+ */
+class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
+
+ // TODO not sure if we should the BasicTypeInfo here
+ var typeInformation: BasicTypeInfo[T] = _
+ var value: T = _
+
+ def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
--- End diff --
This should be regular type information. The `TypeStringUtils` can stringify every valid type information.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194456301
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeDescriptor.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Descriptor for a class type.
--- End diff --
Descriptors might become public API in the future. We should add more comments and have a more fluent method scheme. I would propose something like:
```
ClassInstance().of("org.example.MyFunction")
.parameter("12.0")
.parameter("DOUBLE", "12")
.parameter(Types.DOUBLE, 12)
```
I think we don't need to expose the primitive type descriptor. We can use it internally though. Alternatively, we could also move functionality to `org.apache.flink.table.descriptors.DescriptorProperties` if applicable.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194462064
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE
+import scala.collection.JavaConversions._
+
+/**
+ * Validator for [[PrimitiveTypeDescriptor]].
+ */
+class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
+ override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
+ properties
+ .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", isOptional = false)
+ properties
+ .validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", isOptional = false, 1)
+ }
+}
+
+object PrimitiveTypeValidator {
+ val PRIMITIVE_TYPE = "type"
+ val PRIMITIVE_VALUE = "value"
+
+ def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = {
+ val typeInfo =
+ properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
+ val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
+ val value = typeInfo match {
+ case basicType: BasicTypeInfo[_] =>
+ basicType match {
+ case BasicTypeInfo.INT_TYPE_INFO =>
+ properties.getInt(valueKey)
+ case BasicTypeInfo.LONG_TYPE_INFO =>
+ properties.getLong(valueKey)
+ case BasicTypeInfo.DOUBLE_TYPE_INFO =>
+ properties.getDouble(valueKey)
+ case BasicTypeInfo.STRING_TYPE_INFO =>
+ properties.getString(valueKey)
+ case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
+ properties.getBoolean(valueKey)
+ //TODO add more types
+ case _ => throw TableException(s"Unsupported basic type ${basicType.getTypeClass}.")
+ }
+ case _ => throw TableException("Only primitive types are supported.")
+ }
+ value
+ }
+}
+
--- End diff --
Remove more than two empty lines in several classes.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r196995191
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
--- End diff --
I'm not sure if there will be more descriptors (for other purposes) extend `FunctionDescriptor` in the future. How about renaming it to `UserDefinedFunction`?
---
[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:
https://github.com/apache/flink/pull/6090
Hi @twalthr, sorry for the delay. I've been quite busy with my graduation these weeks. Everything's finished now and I'll put these tasks back on track.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r198484614
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
@@ -187,6 +206,7 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine
private final StreamExecutionEnvironment streamExecEnv;
private final TableEnvironment tableEnv;
+ @SuppressWarnings("unchecked")
--- End diff --
Oh, I forgot the wildcard `?`. Sorry about that.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r196994767
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
+
+ private static final String FROM = "from";
+
+ private From from;
+
+ private UDFDescriptor(String name, From from) {
+ super(name);
+ this.from = from;
+ }
+
+ public From getFrom() {
+ return from;
+ }
+
+ /**
+ * Create a UDF descriptor with the given config.
+ */
+ public static UDFDescriptor create(Map<String, Object> config) {
+ if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
+ throw new SqlClientException("The 'name' attribute of a function is missing.");
+ }
+
+ final Object name = config.get(FunctionValidator.FUNCTION_NAME());
+ if (!(name instanceof String) || ((String) name).length() <= 0) {
+ throw new SqlClientException("Invalid function name '" + name + "'.");
+ }
--- End diff --
At first, I did use `ConfigUtil.normalizeYaml` for that. But I found the type information for some parameters was lost in this process, e.g., we could not tell a `false` is a boolean or a string.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r196996002
--- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ---
@@ -145,6 +146,68 @@ public void testTableSchema() throws Exception {
assertEquals(expectedTableSchema, actualTableSchema);
}
+ @Test(timeout = 30_000L)
+ public void testScalarUDF() throws Exception {
--- End diff --
I'll add more test cases for that.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r197005582
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+ * Descriptor for a primitive type.
+ */
+class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
+
+ // TODO not sure if we should the BasicTypeInfo here
+ var typeInformation: BasicTypeInfo[T] = _
+ var value: T = _
+
+ def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
--- End diff --
Yes, the `TypeStringUtils` can extract every valid type information and the user can specify the type via the API (e.g., `setType(Types.SHORT)`). I just wonder how the basic types supported by Java can be properly inferred from the config file (e.g., how to decide a parameter `1` is a byte, a short or an int). Although a short value or a byte value can be represented with an int, that will affect the constructor searching via Java reflection. Do you have any ideas for that?
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r198482931
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
+
+ private static final String FROM = "from";
+
+ private From from;
+
+ private UDFDescriptor(String name, From from) {
+ super(name);
+ this.from = from;
+ }
+
+ public From getFrom() {
+ return from;
+ }
+
+ /**
+ * Create a UDF descriptor with the given config.
+ */
+ public static UDFDescriptor create(Map<String, Object> config) {
+ if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
+ throw new SqlClientException("The 'name' attribute of a function is missing.");
+ }
+
+ final Object name = config.get(FunctionValidator.FUNCTION_NAME());
+ if (!(name instanceof String) || ((String) name).length() <= 0) {
+ throw new SqlClientException("Invalid function name '" + name + "'.");
+ }
--- End diff --
I see. Thanks for the explanation.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r196998269
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE
+import scala.collection.JavaConversions._
+
+/**
+ * Validator for [[PrimitiveTypeDescriptor]].
+ */
+class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
+ override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
+ properties
+ .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", isOptional = false)
+ properties
+ .validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", isOptional = false, 1)
+ }
+}
+
+object PrimitiveTypeValidator {
+ val PRIMITIVE_TYPE = "type"
+ val PRIMITIVE_VALUE = "value"
+
+ def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = {
+ val typeInfo =
+ properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
+ val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
+ val value = typeInfo match {
+ case basicType: BasicTypeInfo[_] =>
+ basicType match {
+ case BasicTypeInfo.INT_TYPE_INFO =>
+ properties.getInt(valueKey)
+ case BasicTypeInfo.LONG_TYPE_INFO =>
+ properties.getLong(valueKey)
+ case BasicTypeInfo.DOUBLE_TYPE_INFO =>
+ properties.getDouble(valueKey)
+ case BasicTypeInfo.STRING_TYPE_INFO =>
+ properties.getString(valueKey)
+ case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
+ properties.getBoolean(valueKey)
+ //TODO add more types
--- End diff --
Ah, yes. The array type was not considered. I'll think about that and add the support if it's not hard to implement. Otherwise, we could arrange it to a follow-up issue.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r198424133
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
--- End diff --
Sounds good to me.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r198428514
--- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+/**
+ * A bunch of UDFs for SQL-Client test.
+ */
+public class UserDefinedFunctions {
--- End diff --
ok
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194458815
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+ * Function Descriptor
+ */
+class FunctionDescriptor(var name: String) extends Descriptor {
--- End diff --
As mentioned before, we should document every method and have more fluent method names.
E.g.
```
Function.using(...)
```
I know `Function` is pretty generic but we do the same with `Kafka`, `Filesystem` etc.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194442694
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
+
+ private static final String FROM = "from";
+
+ private From from;
+
+ private UDFDescriptor(String name, From from) {
+ super(name);
+ this.from = from;
+ }
+
+ public From getFrom() {
+ return from;
+ }
+
+ /**
+ * Create a UDF descriptor with the given config.
+ */
+ public static UDFDescriptor create(Map<String, Object> config) {
+ if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
+ throw new SqlClientException("The 'name' attribute of a function is missing.");
+ }
+
+ final Object name = config.get(FunctionValidator.FUNCTION_NAME());
+ if (!(name instanceof String) || ((String) name).length() <= 0) {
+ throw new SqlClientException("Invalid function name '" + name + "'.");
+ }
--- End diff --
The following lines contain a lot of casting and assumption about data types. We should do it similar to `Source` instead and convert the incoming YAML to string-based properties first using `ConfigUtil.normalizeYaml`. Both Table API and SQL Client can then share one code path for validation. Similar how we do it for sources.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194452740
--- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ---
@@ -145,6 +146,68 @@ public void testTableSchema() throws Exception {
assertEquals(expectedTableSchema, actualTableSchema);
}
+ @Test(timeout = 30_000L)
+ public void testScalarUDF() throws Exception {
--- End diff --
The full integration test were needed for the beginning but now we should work more with dedicated unit tests. Maybe extend one existing ITCase with a function and check for the general correctness in a new test class.
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r194451816
--- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
@@ -101,6 +110,16 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo
}
});
+ // generate user-defined functions
+ functions = new HashMap<>();
+ mergedEnv.getFunctions().forEach((name, descriptor) -> {
+ DescriptorProperties properties = new DescriptorProperties(true);
+ descriptor.addProperties(properties);
+ functions.put(
+ name,
+ FunctionValidator.generateUserDefinedFunction(properties, classLoader));
--- End diff --
Maybe move the generation logic from `FunctionValidator.generateUserDefinedFunction` to some class `FunctionService`?
---
[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/6090#discussion_r196996876
--- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+/**
+ * A bunch of UDFs for SQL-Client test.
+ */
+public class UserDefinedFunctions {
--- End diff --
The testing functions provided in `flink-table` are mainly considered for the functional test. While the functions added there are mainly considered for the constructional test, i.e., I made the UDF constructors complex or event sort of unreasonable...
---
[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:
https://github.com/apache/flink/pull/6090
Hi @twalthr, I've made some changes to the PR.
1. Add a normalize method in `ClassTypeValidator` which converts the config like `constructor.0 = abc` to `constructor.0.type = STRING constructor.0.value = abc`.
2. Enrich the `DescriptorProperties` with more types.
3. Add a `getListProperties` which returns all values under a group list formed keys. E.g. `constructor` will return all `constructor.#`.
4. Refine the descriptor APIs.
I know there are still a lot of work to do (e.g., add more tests, document the features, support array formed parameters). I'll continue working on that if I still have some time before the deadline.
---