You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Morrisa Brenner <mo...@klaviyo.com> on 2019/05/16 21:18:51 UTC

Generic return type on a user-defined scalar function

Hi Flink folks,

In a Flink job using the SQL API that I’m working on, I have a custom POJO
data type with a generic field, and I would like to be able to call a
user-defined function on this field. I included a similar function below
with the business logic stubbed out, but this example has the return type
I'm looking for.

I have no issues using custom functions of this type when they're used in a
select statement and the `getResultType` method is excluded from the
user-defined function class, but I am unable to get the type information to
resolve correctly in contexts like order by and group by statements. It
still doesn't work even if the `getResultType` method defines the specific
type for a given object explicitly because the job compiler within Flink
seems to be assuming the return type from the `eval` method is just an
Object (type erasure...), and it fails to generate the object code because
it's detecting invalid casts to the desired output type. Without the
`getResultType` method, it just fails to detect type entirely. This seems
to be fine when it's just a select, but if I try to make it do any
operation (like group by) I get the following error:
"org.apache.flink.api.common.InvalidProgramException: This type
(GenericType<java.lang.Object>) cannot be used as key."

Does anyone know if there's a way to get Flink to pay attention to the type
information from `getResultType` when compiling the `eval` method so that
the types work out? Or another way to work around the type erasure on the
eval method without defining explicit user-defined function classes for
each type?

Thanks for your help!

Morrisa



Code snippet:


package flink_generics_testing;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.table.functions.ScalarFunction;

/**

* Reads custom values from a table and performs a function on those values.

* T should be able to be a String, long, float, boolean, or Date

*

* @param <T> The expected type of the table column values.

*/

public class CustomScalarFunction<T> extends ScalarFunction {

  private static final long serialVersionUID = -5537657771138360838L;

  private final Class<T> desiredType;

  /**

   * Construct an instance.

   *

   * @param desiredType The type of the value that we're performing the
function on.

   */

  public CustomScalarFunction(Class<T> desiredType) {

      this.desiredType = desiredType;

  }


  public T eval(T value) {

      return value;

  }

  @Override

  public TypeInformation<?> getResultType(Class<?>[] signature) {

      return TypeInformation.of(desiredType);

  }

  @Override

  public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {

      return new TypeInformation<?>[]{

              TypeInformation.of(desiredType)

      };

  }

}


-- 
Morrisa Brenner
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com>
[image: Klaviyo Logo]

Re: Generic return type on a user-defined scalar function

Posted by Morrisa Brenner <mo...@klaviyo.com>.
Hi JingsongLee and Timo,

Thanks for taking a look and for the feedback!

All the best,
Morrisa


Morrisa Brenner
Software Engineer
━━
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com/>




> On May 21, 2019, at 12:10 AM, JingsongLee <lz...@aliyun.com> wrote:
> 
> Hi Morrisa:
> 
> It seems that flink planner not support return Object(or generic, like you say, type erasure) in ScalarFunction.
> In ScalarFunctionCallGen:
> val functionCallCode =
>   s"""
>     |${parameters.map(_.code).mkString("\n")}
>     |$resultTypeTerm $resultTerm = $functionReference.eval(
>     |  ${parameters.map(_.resultTerm).mkString(", ")});
>     |""".stripMargin
> There should be a coercive transformation to eval return value to support this situation.
> I have no ideas to bypass it. If you can modify the source code, you can change it to this way to support generic return type:
> val functionCallCode =
>   s"""
>     |${parameters.map(_.code).mkString("\n")}
>     |$resultTypeTerm $resultTerm = ($resultTypeTerm) $functionReference.eval(
>     |  ${parameters.map(_.resultTerm).mkString(", ")});
>     |""".stripMargin
> 
> Best, JingsongLee
> 
> ------------------------------------------------------------------
> From:Timo Walther <tw...@apache.org>
> Send Time:2019年5月20日(星期一) 23:03
> To:user <us...@flink.apache.org>
> Subject:Re: Generic return type on a user-defined scalar function
> 
> Hi Morrisa,
> 
> usually, this means that you class is not recognized as a POJO. Please check again the requirements of a POJO: Default constructor, getters and setters for every field etc. You can use org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class is a POJO or not.
> 
> I hope this helps.
> 
> Regards,
> Timo
> 
> 
> Am 16.05.19 um 23:18 schrieb Morrisa Brenner:
> Hi Flink folks,
> 
> In a Flink job using the SQL API that I’m working on, I have a custom POJO data type with a generic field, and I would like to be able to call a user-defined function on this field. I included a similar function below with the business logic stubbed out, but this example has the return type I'm looking for.
> 
> I have no issues using custom functions of this type when they're used in a select statement and the `getResultType` method is excluded from the user-defined function class, but I am unable to get the type information to resolve correctly in contexts like order by and group by statements. It still doesn't work even if the `getResultType` method defines the specific type for a given object explicitly because the job compiler within Flink seems to be assuming the return type from the `eval` method is just an Object (type erasure...), and it fails to generate the object code because it's detecting invalid casts to the desired output type. Without the `getResultType` method, it just fails to detect type entirely. This seems to be fine when it's just a select, but if I try to make it do any operation (like group by) I get the following error: "org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key."
> 
> Does anyone know if there's a way to get Flink to pay attention to the type information from `getResultType` when compiling the `eval` method so that the types work out? Or another way to work around the type erasure on the eval method without defining explicit user-defined function classes for each type?
> 
> Thanks for your help!
> 
> Morrisa
> 
> 
> 
> Code snippet:
> 
> 
> package flink_generics_testing;
> 
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.table.functions.ScalarFunction;
> 
> /**
> * Reads custom values from a table and performs a function on those values.
> * T should be able to be a String, long, float, boolean, or Date
> *
> * @param <T> The expected type of the table column values.
> */
> public class CustomScalarFunction<T> extends ScalarFunction {
> 
>    private static final long serialVersionUID = -5537657771138360838L;
> 
>    private final Class<T> desiredType;
> 
>    /**
>     * Construct an instance.
>     *
>     * @param desiredType The type of the value that we're performing the function on.
>     */
>    public CustomScalarFunction(Class<T> desiredType) {
>        this.desiredType = desiredType;
>    }
> 
>    public T eval(T value) {
>        return value;
>    }
> 
>    @Override
>    public TypeInformation<?> getResultType(Class<?>[] signature) {
>        return TypeInformation.of(desiredType);
>    }
> 
>    @Override
>    public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
>        return new TypeInformation<?>[]{
>                TypeInformation.of(desiredType)
>        };
>    }
> }
> 
> 
> -- 
> 	Morrisa Brenner
> Software Engineer
> 
> 225 Franklin St, Boston, MA 02110
> klaviyo.com <https://www.klaviyo.com/>
> 
> 


Re: Generic return type on a user-defined scalar function

Posted by JingsongLee <lz...@aliyun.com>.
Hi Morrisa:

It seems that flink planner not support return Object(or generic, like you say, type erasure) in ScalarFunction.
In ScalarFunctionCallGen:
val functionCallCode =
  s"""
    |${parameters.map(_.code).mkString("\n")}
    |$resultTypeTerm $resultTerm = $functionReference.eval(
    |  ${parameters.map(_.resultTerm).mkString(", ")});
    |""".stripMargin
There should be a coercive transformation to eval return value to support this situation.
I have no ideas to bypass it. If you can modify the source code, you can change it to this way to support generic return type:
val functionCallCode =
  s"""
    |${parameters.map(_.code).mkString("\n")}
    |$resultTypeTerm $resultTerm = ($resultTypeTerm) $functionReference.eval(
    |  ${parameters.map(_.resultTerm).mkString(", ")});
    |""".stripMargin

Best, JingsongLee


------------------------------------------------------------------
From:Timo Walther <tw...@apache.org>
Send Time:2019年5月20日(星期一) 23:03
To:user <us...@flink.apache.org>
Subject:Re: Generic return type on a user-defined scalar function

 
Hi Morrisa,

usually, this means that you class is not recognized as a POJO. Please check again the requirements of a POJO: Default constructor, getters and setters for every field etc. You can use org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class is a POJO or not.

I hope this helps.

Regards,
Timo


Am 16.05.19 um 23:18 schrieb Morrisa Brenner:


Hi Flink folks, 
In a Flink job using the SQL API that I’m working on, I have a custom POJO data type with a generic field, and I would like to be able to call a user-defined function on this field. I included a similar function below with the business logic stubbed out, but this example has the return type I'm looking for. 
I have no issues using custom functions of this type when they're used in a select statement and the `getResultType` method is excluded from the user-defined function class, but I am unable to get the type information to resolve correctly in contexts like order by and group by statements. It still doesn't work even if the `getResultType` method defines the specific type for a given object explicitly because the job compiler within Flink seems to be assuming the return type from the `eval` method is just an Object (type erasure...), and it fails to generate the object code because it's detecting invalid casts to the desired output type. Without the `getResultType` method, it just fails to detect type entirely. This seems to be fine when it's just a select, but if I try to make it do any operation (like group by) I get the following error: "org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key." 
Does anyone know if there's a way to get Flink to pay attention to the type information from `getResultType` when compiling the `eval` method so that the types work out? Or another way to work around the type erasure on the eval method without defining explicit user-defined function classes for each type? 
Thanks for your help! 
Morrisa 


Code snippet: 

package flink_generics_testing; 
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.ScalarFunction; 
/**
* Reads custom values from a table and performs a function on those values.
* T should be able to be a String, long, float, boolean, or Date
*
* @param <T> The expected type of the table column values.
*/
public class CustomScalarFunction<T> extends ScalarFunction { 
 private static final long serialVersionUID = -5537657771138360838L; 
   private final Class<T> desiredType; 
 /**
    * Construct an instance.
    *
    * @param desiredType The type of the value that we're performing the function on.
    */
 public CustomScalarFunction(Class<T> desiredType) {
 this.desiredType = desiredType;
 }
 
 public T eval(T value) {
 return value;
 } 
 @Override
 public TypeInformation<?> getResultType(Class<?>[] signature) {
 return TypeInformation.of(desiredType);
 } 
 @Override
 public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
 return new TypeInformation<?>[]{
               TypeInformation.of(desiredType)
       };
 }
} 

 -- 



		Morrisa Brenner
Software Engineer 


	
	 225 Franklin St, Boston, MA 02110
klaviyo.com 
	[Klaviyo
                                  Logo]             


Re: Generic return type on a user-defined scalar function

Posted by Timo Walther <tw...@apache.org>.
Hi Morrisa,

usually, this means that you class is not recognized as a POJO. Please 
check again the requirements of a POJO: Default constructor, getters and 
setters for every field etc. You can use 
org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your 
class is a POJO or not.

I hope this helps.

Regards,
Timo


Am 16.05.19 um 23:18 schrieb Morrisa Brenner:
>
> Hi Flink folks,
>
>
> In a Flink job using the SQL API that I’m working on, I have a custom 
> POJO data type with a generic field, and I would like to be able to 
> call a user-defined function on this field.I included a similar 
> function below with the business logic stubbed out, but this example 
> has the return type I'm looking for.
>
>
> I have no issues using custom functions of this type when they're used 
> in a select statement and the `getResultType` method is excluded from 
> the user-defined function class, but I am unable to get the type 
> information to resolve correctly in contexts like order by and group 
> by statements. It still doesn't work even if the `getResultType` 
> method defines the specific type for a given object explicitly because 
> the job compiler within Flink seems to be assuming the return type 
> from the `eval` method is just an Object (type erasure...), and it 
> fails to generate the object code because it's detecting invalid casts 
> to the desired output type. Without the `getResultType` method, it 
> just fails to detect type entirely. This seems to be fine when it's 
> just a select, but if I try to make it do any operation (like group 
> by) I get the following error: 
> "org.apache.flink.api.common.InvalidProgramException: This type 
> (GenericType<java.lang.Object>) cannot be used as key."
>
>
> Does anyone know if there's a way to get Flink to pay attention to the 
> type information from `getResultType` when compiling the `eval` method 
> so that the types work out? Or another way to work around the type 
> erasure on the eval method without defining explicit user-defined 
> function classes for each type?
>
>
> Thanks for your help!
>
>
> Morrisa
>
>
>
>
> Code snippet:
>
>
>
> package flink_generics_testing;
>
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
>
> import org.apache.flink.table.functions.ScalarFunction;
>
>
> /**
>
> * Reads custom values from a table and performs a function on those 
> values.
>
> * T should be able to be a String, long, float, boolean, or Date
>
> *
>
> * @param <T> The expected type of the table column values.
>
> */
>
> public class CustomScalarFunction<T> extends ScalarFunction {
>
>
> private static final long serialVersionUID = -5537657771138360838L;
>
>
>   private final Class<T> desiredType;
>
>
> /**
>
>    * Construct an instance.
>
>    *
>
>    * @param desiredType The type of the value that we're performing 
> the function on.
>
>    */
>
> public CustomScalarFunction(Class<T> desiredType) {
>
> this.desiredType = desiredType;
>
> }
>
> public T eval(T value) {
>
> return value;
>
> }
>
>
> @Override
>
> public TypeInformation<?> getResultType(Class<?>[] signature) {
>
> return TypeInformation.of(desiredType);
>
> }
>
>
> @Override
>
> public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
>
> return new TypeInformation<?>[]{
>
>               TypeInformation.of(desiredType)
>
>       };
>
> }
>
> }
>
>
>
> -- 
> 	Morrisa Brenner
> Software Engineer
>
> 225 Franklin St, Boston, MA 02110
> klaviyo.com <https://www.klaviyo.com>
> Klaviyo Logo
>