You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by xccui <gi...@git.apache.org> on 2018/05/28 17:51:22 UTC

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

GitHub user xccui opened a pull request:

    https://github.com/apache/flink/pull/6090

    [FLINK-8863] [SQL] Add user-defined function support in SQL Client

    ## What is the purpose of the change
    
    This PR aims to add user-defined function (ScalarFunction, TableFunction and AggregateFunction) support to the SQL Client.
    
    ## Brief change log
    
      - Introduce a new `HierarchyDescriptor` and its corresponding validator `HierarchyDescriptorValidator`, which allow constructing descriptors hierarchically.
      - Add a `PrimitiveTypeDescriptor` to describe a primitive type value and a `ClassTypeDescriptor` to describe a class type value. A `ClassTypeDescriptor` contains a `constructor` field, which is composed of a list of `PrimitiveTypeDescriptor` or `ClassTypeDescriptor`.
      - Add a `UDFDescriptor` and its base class `FunctionDescriptor` to describe a `UserDefinedFunction`. Given a `DescriptorProperties`, a `UserDefinedFunction` can be instantiated with the `FunctionValidator.generateUserDefinedFunction()` method.
      - Add related tests for the new components.
    
    ## Verifying this change
    
    The change can be verified with the test cases added in `LocalExecutorITCase.java`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (*the doc has not been finished yet*)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xccui/flink FLINK-8863-udf

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6090
    
----
commit c6c7d63f96eda01e24735271859ea8528e229021
Author: Xingcan Cui <xi...@...>
Date:   2018-05-27T15:36:25Z

    [FLINK-8863] Add user-defined function support in SQL Client

----


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194463649
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.api.TableException
    +import org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Validator for [[PrimitiveTypeDescriptor]].
    +  */
    +class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
    +  override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
    +    properties
    +      .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", isOptional = false)
    +    properties
    +      .validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", isOptional = false, 1)
    +  }
    +}
    +
    +object PrimitiveTypeValidator {
    +  val PRIMITIVE_TYPE = "type"
    +  val PRIMITIVE_VALUE = "value"
    +
    +  def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = {
    +    val typeInfo =
    +      properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
    +    val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
    +    val value = typeInfo match {
    +      case basicType: BasicTypeInfo[_] =>
    +        basicType match {
    +          case BasicTypeInfo.INT_TYPE_INFO =>
    +            properties.getInt(valueKey)
    +          case BasicTypeInfo.LONG_TYPE_INFO =>
    +            properties.getLong(valueKey)
    +          case BasicTypeInfo.DOUBLE_TYPE_INFO =>
    +            properties.getDouble(valueKey)
    +          case BasicTypeInfo.STRING_TYPE_INFO =>
    +            properties.getString(valueKey)
    +          case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
    +            properties.getBoolean(valueKey)
    +          //TODO add more types
    --- End diff --
    
    Would be great if your could add support for all Java basic types. We can do arrays in a follow-up issue or on user request.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194451906
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
    @@ -187,6 +206,7 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine
     		private final StreamExecutionEnvironment streamExecEnv;
     		private final TableEnvironment tableEnv;
     
    +		@SuppressWarnings("unchecked")
    --- End diff --
    
    Remove this?


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r198426083
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.client.config;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.client.SqlClientException;
    +import org.apache.flink.table.descriptors.ClassTypeDescriptor;
    +import org.apache.flink.table.descriptors.ClassTypeValidator;
    +import org.apache.flink.table.descriptors.DescriptorProperties;
    +import org.apache.flink.table.descriptors.FunctionDescriptor;
    +import org.apache.flink.table.descriptors.FunctionValidator;
    +import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
    +import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
    +import org.apache.flink.table.typeutils.TypeStringUtils;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
    +
    +/**
    + * Descriptor for user-defined functions.
    + */
    +public class UDFDescriptor extends FunctionDescriptor {
    +
    +	private static final String FROM = "from";
    +
    +	private From from;
    +
    +	private UDFDescriptor(String name, From from) {
    +		super(name);
    +		this.from = from;
    +	}
    +
    +	public From getFrom() {
    +		return from;
    +	}
    +
    +	/**
    +	 * Create a UDF descriptor with the given config.
    +	 */
    +	public static UDFDescriptor create(Map<String, Object> config) {
    +		if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
    +			throw new SqlClientException("The 'name' attribute of a function is missing.");
    +		}
    +
    +		final Object name = config.get(FunctionValidator.FUNCTION_NAME());
    +		if (!(name instanceof String) || ((String) name).length() <= 0) {
    +			throw new SqlClientException("Invalid function name '" + name + "'.");
    +		}
    --- End diff --
    
    I think it is ok to loose the type information from YAML. The decision about a value being boolean or string should depend on the pure-string property. We will also support JSON and other translation paths in the future. So the parsing logic should be centralized. Let's assume `true/false` strings as boolean. If it should be interpreted as string, a user can specify `type: VARCHAR`.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r198430280
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.api.TableException
    +import org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Validator for [[PrimitiveTypeDescriptor]].
    +  */
    +class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
    +  override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
    +    properties
    +      .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", isOptional = false)
    +    properties
    +      .validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", isOptional = false, 1)
    +  }
    +}
    +
    +object PrimitiveTypeValidator {
    +  val PRIMITIVE_TYPE = "type"
    +  val PRIMITIVE_VALUE = "value"
    +
    +  def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = {
    +    val typeInfo =
    +      properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
    +    val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
    +    val value = typeInfo match {
    +      case basicType: BasicTypeInfo[_] =>
    +        basicType match {
    +          case BasicTypeInfo.INT_TYPE_INFO =>
    +            properties.getInt(valueKey)
    +          case BasicTypeInfo.LONG_TYPE_INFO =>
    +            properties.getLong(valueKey)
    +          case BasicTypeInfo.DOUBLE_TYPE_INFO =>
    +            properties.getDouble(valueKey)
    +          case BasicTypeInfo.STRING_TYPE_INFO =>
    +            properties.getString(valueKey)
    +          case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
    +            properties.getBoolean(valueKey)
    +          //TODO add more types
    --- End diff --
    
    Thanks :)


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r196995942
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
    @@ -187,6 +206,7 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine
     		private final StreamExecutionEnvironment streamExecEnv;
     		private final TableEnvironment tableEnv;
     
    +		@SuppressWarnings("unchecked")
    --- End diff --
    
    The `AggregateFunction` and `TableFunction` take generic type parameters. Thus when do conversions (e.g., `streamTableEnvironment.registerFunction(k, (AggregateFunction) udf);`) the Java compiler complains about that.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r198427940
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
    @@ -187,6 +206,7 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine
     		private final StreamExecutionEnvironment streamExecEnv;
     		private final TableEnvironment tableEnv;
     
    +		@SuppressWarnings("unchecked")
    --- End diff --
    
    Weird. If I remove the annotation and add wildcards, my IDE does not show a warning:
    `streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);`


---

[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/6090
  
    @xccui no problem. great to hear that you graduated :)
    The feature freeze is in approximately 3 weeks, we should be done by then. Let me know if you need support.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r198429917
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.typeutils.TypeStringUtils
    +
    +/**
    +  * Descriptor for a primitive type.
    +  */
    +class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
    +
    +  // TODO not sure if we should the BasicTypeInfo here
    +  var typeInformation: BasicTypeInfo[T] = _
    +  var value: T = _
    +
    +  def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
    --- End diff --
    
    We should only support very simple type inference logic for values without specified types. E.g.:
    - "1" -> always int
    - "1.0" -> always double
    - "true/false" -> always boolean
    - string otherwise
    
    This covers 80% of all parameters. For everything else, declare a type and we will call `Byte.valueOf()` etc.


---

[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/6090
  
    @xccui perfect. thank you very much!


---

[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/6090
  
    Hi @xccui, sorry for being pushy but when do you think you can update this PR? The feature freeze is in approx. 1 week and I need to coordinate my work/reviewing efforts. Let me know if you want me to take over.


---

[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/6090
  
    Hi @twalthr,  please give me one more day. I will commit the changes tomorrow. 😄 


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r198485307
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.typeutils.TypeStringUtils
    +
    +/**
    +  * Descriptor for a primitive type.
    +  */
    +class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
    +
    +  // TODO not sure if we should the BasicTypeInfo here
    +  var typeInformation: BasicTypeInfo[T] = _
    +  var value: T = _
    +
    +  def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
    --- End diff --
    
    Sounds reasonable. Thanks.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6090


---

[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/6090
  
    Thank you very much @xccui! I will have a final pass over the chances and merge this. I think we this PR we cover the most important cases. I will open follow-up issues for docs etc. We can add those things after the feature freeze. 


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194426814
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.client.config;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.client.SqlClientException;
    +import org.apache.flink.table.descriptors.ClassTypeDescriptor;
    +import org.apache.flink.table.descriptors.ClassTypeValidator;
    +import org.apache.flink.table.descriptors.DescriptorProperties;
    +import org.apache.flink.table.descriptors.FunctionDescriptor;
    +import org.apache.flink.table.descriptors.FunctionValidator;
    +import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
    +import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
    +import org.apache.flink.table.typeutils.TypeStringUtils;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
    +
    +/**
    + * Descriptor for user-defined functions.
    + */
    +public class UDFDescriptor extends FunctionDescriptor {
    --- End diff --
    
    I would just call this `Function`. Because it is similar to the existing `Source` class.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194453136
  
    --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.client.gateway.utils;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.api.Types;
    +import org.apache.flink.table.functions.AggregateFunction;
    +import org.apache.flink.table.functions.ScalarFunction;
    +import org.apache.flink.table.functions.TableFunction;
    +import org.apache.flink.types.Row;
    +
    +/**
    + * A bunch of UDFs for SQL-Client test.
    + */
    +public class UserDefinedFunctions {
    --- End diff --
    
    `flink-table` already provides testing functions. Maybe you can add some there if needed.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194459557
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/HierarchyDescriptor.scala ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +/**
    +  * A descriptor that may exist in different levels (be included by other descriptors).
    +  */
    +abstract class HierarchyDescriptor extends Descriptor {
    +
    +  private[flink] def addPropertiesWithPrefix(
    --- End diff --
    
    Add an internal JavaDoc notice like in `addProperties` because this method will be public in Java API.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r196995241
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
    @@ -101,6 +110,16 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo
     			}
     		});
     
    +		// generate user-defined functions
    +		functions = new HashMap<>();
    +		mergedEnv.getFunctions().forEach((name, descriptor) -> {
    +			DescriptorProperties properties = new DescriptorProperties(true);
    +			descriptor.addProperties(properties);
    +			functions.put(
    +					name,
    +					FunctionValidator.generateUserDefinedFunction(properties, classLoader));
    --- End diff --
    
    That's a good idea.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194461789
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.typeutils.TypeStringUtils
    +
    +/**
    +  * Descriptor for a primitive type.
    +  */
    +class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
    +
    +  // TODO not sure if we should the BasicTypeInfo here
    +  var typeInformation: BasicTypeInfo[T] = _
    +  var value: T = _
    +
    +  def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
    --- End diff --
    
    This should be regular type information. The `TypeStringUtils` can stringify every valid type information. 


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194456301
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ClassTypeDescriptor.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +/**
    +  * Descriptor for a class type.
    --- End diff --
    
    Descriptors might become public API in the future. We should add more comments and have a more fluent method scheme. I would propose something like:
    
    ```
    ClassInstance().of("org.example.MyFunction")
      .parameter("12.0")
      .parameter("DOUBLE", "12")
      .parameter(Types.DOUBLE, 12)
    ```
    
    I think we don't need to expose the primitive type descriptor. We can use it internally though. Alternatively, we could also move functionality to `org.apache.flink.table.descriptors.DescriptorProperties` if applicable.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194462064
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.api.TableException
    +import org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Validator for [[PrimitiveTypeDescriptor]].
    +  */
    +class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
    +  override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
    +    properties
    +      .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", isOptional = false)
    +    properties
    +      .validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", isOptional = false, 1)
    +  }
    +}
    +
    +object PrimitiveTypeValidator {
    +  val PRIMITIVE_TYPE = "type"
    +  val PRIMITIVE_VALUE = "value"
    +
    +  def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = {
    +    val typeInfo =
    +      properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
    +    val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
    +    val value = typeInfo match {
    +      case basicType: BasicTypeInfo[_] =>
    +        basicType match {
    +          case BasicTypeInfo.INT_TYPE_INFO =>
    +            properties.getInt(valueKey)
    +          case BasicTypeInfo.LONG_TYPE_INFO =>
    +            properties.getLong(valueKey)
    +          case BasicTypeInfo.DOUBLE_TYPE_INFO =>
    +            properties.getDouble(valueKey)
    +          case BasicTypeInfo.STRING_TYPE_INFO =>
    +            properties.getString(valueKey)
    +          case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
    +            properties.getBoolean(valueKey)
    +          //TODO add more types
    +          case _ => throw TableException(s"Unsupported basic type ${basicType.getTypeClass}.")
    +        }
    +      case _ => throw TableException("Only primitive types are supported.")
    +    }
    +    value
    +  }
    +}
    +
    --- End diff --
    
    Remove more than two empty lines in several classes.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r196995191
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.client.config;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.client.SqlClientException;
    +import org.apache.flink.table.descriptors.ClassTypeDescriptor;
    +import org.apache.flink.table.descriptors.ClassTypeValidator;
    +import org.apache.flink.table.descriptors.DescriptorProperties;
    +import org.apache.flink.table.descriptors.FunctionDescriptor;
    +import org.apache.flink.table.descriptors.FunctionValidator;
    +import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
    +import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
    +import org.apache.flink.table.typeutils.TypeStringUtils;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
    +
    +/**
    + * Descriptor for user-defined functions.
    + */
    +public class UDFDescriptor extends FunctionDescriptor {
    --- End diff --
    
    I'm not sure if there will be more descriptors (for other purposes) extend `FunctionDescriptor` in the future. How about renaming it to `UserDefinedFunction`?


---

[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/6090
  
    Hi @twalthr, sorry for the delay. I've been quite busy with my graduation these weeks. Everything's finished now and I'll put these tasks back on track.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r198484614
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
    @@ -187,6 +206,7 @@ private static ClusterSpecification createClusterSpecification(CustomCommandLine
     		private final StreamExecutionEnvironment streamExecEnv;
     		private final TableEnvironment tableEnv;
     
    +		@SuppressWarnings("unchecked")
    --- End diff --
    
    Oh, I forgot the wildcard `?`. Sorry about that.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r196994767
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.client.config;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.client.SqlClientException;
    +import org.apache.flink.table.descriptors.ClassTypeDescriptor;
    +import org.apache.flink.table.descriptors.ClassTypeValidator;
    +import org.apache.flink.table.descriptors.DescriptorProperties;
    +import org.apache.flink.table.descriptors.FunctionDescriptor;
    +import org.apache.flink.table.descriptors.FunctionValidator;
    +import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
    +import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
    +import org.apache.flink.table.typeutils.TypeStringUtils;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
    +
    +/**
    + * Descriptor for user-defined functions.
    + */
    +public class UDFDescriptor extends FunctionDescriptor {
    +
    +	private static final String FROM = "from";
    +
    +	private From from;
    +
    +	private UDFDescriptor(String name, From from) {
    +		super(name);
    +		this.from = from;
    +	}
    +
    +	public From getFrom() {
    +		return from;
    +	}
    +
    +	/**
    +	 * Create a UDF descriptor with the given config.
    +	 */
    +	public static UDFDescriptor create(Map<String, Object> config) {
    +		if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
    +			throw new SqlClientException("The 'name' attribute of a function is missing.");
    +		}
    +
    +		final Object name = config.get(FunctionValidator.FUNCTION_NAME());
    +		if (!(name instanceof String) || ((String) name).length() <= 0) {
    +			throw new SqlClientException("Invalid function name '" + name + "'.");
    +		}
    --- End diff --
    
    At first, I did use `ConfigUtil.normalizeYaml` for that. But I found the type information for some parameters was lost in this process, e.g., we could not tell a `false` is a boolean or a string.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r196996002
  
    --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ---
    @@ -145,6 +146,68 @@ public void testTableSchema() throws Exception {
     		assertEquals(expectedTableSchema, actualTableSchema);
     	}
     
    +	@Test(timeout = 30_000L)
    +	public void testScalarUDF() throws Exception {
    --- End diff --
    
    I'll add more test cases for that.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r197005582
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.typeutils.TypeStringUtils
    +
    +/**
    +  * Descriptor for a primitive type.
    +  */
    +class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
    +
    +  // TODO not sure if we should the BasicTypeInfo here
    +  var typeInformation: BasicTypeInfo[T] = _
    +  var value: T = _
    +
    +  def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
    --- End diff --
    
    Yes, the `TypeStringUtils` can extract every valid type information and the user can specify the type via the API (e.g., `setType(Types.SHORT)`). I just wonder how the basic types supported by Java can be properly inferred from the config file (e.g., how to decide a parameter `1` is a byte, a short or an int). Although a short value or a byte value can be represented with an int, that will affect the constructor searching via Java reflection. Do you have any ideas for that?


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r198482931
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.client.config;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.client.SqlClientException;
    +import org.apache.flink.table.descriptors.ClassTypeDescriptor;
    +import org.apache.flink.table.descriptors.ClassTypeValidator;
    +import org.apache.flink.table.descriptors.DescriptorProperties;
    +import org.apache.flink.table.descriptors.FunctionDescriptor;
    +import org.apache.flink.table.descriptors.FunctionValidator;
    +import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
    +import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
    +import org.apache.flink.table.typeutils.TypeStringUtils;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
    +
    +/**
    + * Descriptor for user-defined functions.
    + */
    +public class UDFDescriptor extends FunctionDescriptor {
    +
    +	private static final String FROM = "from";
    +
    +	private From from;
    +
    +	private UDFDescriptor(String name, From from) {
    +		super(name);
    +		this.from = from;
    +	}
    +
    +	public From getFrom() {
    +		return from;
    +	}
    +
    +	/**
    +	 * Create a UDF descriptor with the given config.
    +	 */
    +	public static UDFDescriptor create(Map<String, Object> config) {
    +		if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
    +			throw new SqlClientException("The 'name' attribute of a function is missing.");
    +		}
    +
    +		final Object name = config.get(FunctionValidator.FUNCTION_NAME());
    +		if (!(name instanceof String) || ((String) name).length() <= 0) {
    +			throw new SqlClientException("Invalid function name '" + name + "'.");
    +		}
    --- End diff --
    
    I see. Thanks for the explanation.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r196998269
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.api.TableException
    +import org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Validator for [[PrimitiveTypeDescriptor]].
    +  */
    +class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
    +  override def validateWithPrefix(keyPrefix: String, properties: DescriptorProperties): Unit = {
    +    properties
    +      .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", isOptional = false)
    +    properties
    +      .validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", isOptional = false, 1)
    +  }
    +}
    +
    +object PrimitiveTypeValidator {
    +  val PRIMITIVE_TYPE = "type"
    +  val PRIMITIVE_VALUE = "value"
    +
    +  def derivePrimitiveValue(keyPrefix: String, properties: DescriptorProperties): Any = {
    +    val typeInfo =
    +      properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
    +    val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
    +    val value = typeInfo match {
    +      case basicType: BasicTypeInfo[_] =>
    +        basicType match {
    +          case BasicTypeInfo.INT_TYPE_INFO =>
    +            properties.getInt(valueKey)
    +          case BasicTypeInfo.LONG_TYPE_INFO =>
    +            properties.getLong(valueKey)
    +          case BasicTypeInfo.DOUBLE_TYPE_INFO =>
    +            properties.getDouble(valueKey)
    +          case BasicTypeInfo.STRING_TYPE_INFO =>
    +            properties.getString(valueKey)
    +          case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
    +            properties.getBoolean(valueKey)
    +          //TODO add more types
    --- End diff --
    
    Ah, yes. The array type was not considered. I'll think about that and add the support if it's not hard to implement. Otherwise, we could arrange it to a follow-up issue.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r198424133
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.client.config;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.client.SqlClientException;
    +import org.apache.flink.table.descriptors.ClassTypeDescriptor;
    +import org.apache.flink.table.descriptors.ClassTypeValidator;
    +import org.apache.flink.table.descriptors.DescriptorProperties;
    +import org.apache.flink.table.descriptors.FunctionDescriptor;
    +import org.apache.flink.table.descriptors.FunctionValidator;
    +import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
    +import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
    +import org.apache.flink.table.typeutils.TypeStringUtils;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
    +
    +/**
    + * Descriptor for user-defined functions.
    + */
    +public class UDFDescriptor extends FunctionDescriptor {
    --- End diff --
    
    Sounds good to me.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r198428514
  
    --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.client.gateway.utils;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.api.Types;
    +import org.apache.flink.table.functions.AggregateFunction;
    +import org.apache.flink.table.functions.ScalarFunction;
    +import org.apache.flink.table.functions.TableFunction;
    +import org.apache.flink.types.Row;
    +
    +/**
    + * A bunch of UDFs for SQL-Client test.
    + */
    +public class UserDefinedFunctions {
    --- End diff --
    
    ok


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194458815
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FunctionDescriptor.scala ---
    @@ -0,0 +1,48 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +/**
    +  * Function Descriptor
    +  */
    +class FunctionDescriptor(var name: String) extends Descriptor {
    --- End diff --
    
    As mentioned before, we should document every method and have more fluent method names.
    
    E.g.
    ```
    Function.using(...)
    ```
    
    I know `Function` is pretty generic but we do the same with `Kafka`, `Filesystem` etc.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194442694
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.client.config;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.client.SqlClientException;
    +import org.apache.flink.table.descriptors.ClassTypeDescriptor;
    +import org.apache.flink.table.descriptors.ClassTypeValidator;
    +import org.apache.flink.table.descriptors.DescriptorProperties;
    +import org.apache.flink.table.descriptors.FunctionDescriptor;
    +import org.apache.flink.table.descriptors.FunctionValidator;
    +import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
    +import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
    +import org.apache.flink.table.typeutils.TypeStringUtils;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
    +
    +/**
    + * Descriptor for user-defined functions.
    + */
    +public class UDFDescriptor extends FunctionDescriptor {
    +
    +	private static final String FROM = "from";
    +
    +	private From from;
    +
    +	private UDFDescriptor(String name, From from) {
    +		super(name);
    +		this.from = from;
    +	}
    +
    +	public From getFrom() {
    +		return from;
    +	}
    +
    +	/**
    +	 * Create a UDF descriptor with the given config.
    +	 */
    +	public static UDFDescriptor create(Map<String, Object> config) {
    +		if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
    +			throw new SqlClientException("The 'name' attribute of a function is missing.");
    +		}
    +
    +		final Object name = config.get(FunctionValidator.FUNCTION_NAME());
    +		if (!(name instanceof String) || ((String) name).length() <= 0) {
    +			throw new SqlClientException("Invalid function name '" + name + "'.");
    +		}
    --- End diff --
    
    The following lines contain a lot of casting and assumption about data types. We should do it similar to `Source` instead and convert the incoming YAML to string-based properties first using `ConfigUtil.normalizeYaml`. Both Table API and SQL Client can then share one code path for validation. Similar how we do it for sources.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194452740
  
    --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java ---
    @@ -145,6 +146,68 @@ public void testTableSchema() throws Exception {
     		assertEquals(expectedTableSchema, actualTableSchema);
     	}
     
    +	@Test(timeout = 30_000L)
    +	public void testScalarUDF() throws Exception {
    --- End diff --
    
    The full integration test were needed for the beginning but now we should work more with dedicated unit tests. Maybe extend one existing ITCase with a function and check for the general correctness in a new test class.


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r194451816
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java ---
    @@ -101,6 +110,16 @@ public ExecutionContext(Environment defaultEnvironment, SessionContext sessionCo
     			}
     		});
     
    +		// generate user-defined functions
    +		functions = new HashMap<>();
    +		mergedEnv.getFunctions().forEach((name, descriptor) -> {
    +			DescriptorProperties properties = new DescriptorProperties(true);
    +			descriptor.addProperties(properties);
    +			functions.put(
    +					name,
    +					FunctionValidator.generateUserDefinedFunction(properties, classLoader));
    --- End diff --
    
    Maybe move the generation logic from `FunctionValidator.generateUserDefinedFunction` to some class `FunctionService`?


---

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6090#discussion_r196996876
  
    --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.client.gateway.utils;
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.table.api.Types;
    +import org.apache.flink.table.functions.AggregateFunction;
    +import org.apache.flink.table.functions.ScalarFunction;
    +import org.apache.flink.table.functions.TableFunction;
    +import org.apache.flink.types.Row;
    +
    +/**
    + * A bunch of UDFs for SQL-Client test.
    + */
    +public class UserDefinedFunctions {
    --- End diff --
    
    The testing functions provided in `flink-table` are mainly considered for the functional test. While the functions added there are mainly considered for the constructional test, i.e., I made the UDF constructors complex or event sort of unreasonable...


---

[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/6090
  
    Hi @twalthr, I've made some changes to the PR.
    1. Add a normalize method in `ClassTypeValidator` which converts the config like `constructor.0 = abc` to `constructor.0.type = STRING constructor.0.value = abc`.
    2. Enrich the `DescriptorProperties` with more types.
    3. Add a `getListProperties` which returns all values under a group list formed keys. E.g. `constructor` will return all `constructor.#`.
    4. Refine the descriptor APIs.
    
    I know there are still a lot of work to do (e.g., add more tests, document the features, support array formed parameters). I'll continue working on that if I still have some time before the deadline.


---