You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Morrisa Brenner <mo...@klaviyo.com> on 2019/05/16 21:18:51 UTC
Generic return type on a user-defined scalar function
Hi Flink folks,
In a Flink job using the SQL API that I’m working on, I have a custom POJO
data type with a generic field, and I would like to be able to call a
user-defined function on this field. I included a similar function below
with the business logic stubbed out, but this example has the return type
I'm looking for.
I have no issues using custom functions of this type when they're used in a
select statement and the `getResultType` method is excluded from the
user-defined function class, but I am unable to get the type information to
resolve correctly in contexts like order by and group by statements. It
still doesn't work even if the `getResultType` method defines the specific
type for a given object explicitly because the job compiler within Flink
seems to be assuming the return type from the `eval` method is just an
Object (type erasure...), and it fails to generate the object code because
it's detecting invalid casts to the desired output type. Without the
`getResultType` method, it just fails to detect type entirely. This seems
to be fine when it's just a select, but if I try to make it do any
operation (like group by) I get the following error:
"org.apache.flink.api.common.InvalidProgramException: This type
(GenericType<java.lang.Object>) cannot be used as key."
Does anyone know if there's a way to get Flink to pay attention to the type
information from `getResultType` when compiling the `eval` method so that
the types work out? Or another way to work around the type erasure on the
eval method without defining explicit user-defined function classes for
each type?
Thanks for your help!
Morrisa
Code snippet:
package flink_generics_testing;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.ScalarFunction;
/**
* Reads custom values from a table and performs a function on those values.
* T should be able to be a String, long, float, boolean, or Date
*
* @param <T> The expected type of the table column values.
*/
public class CustomScalarFunction<T> extends ScalarFunction {
private static final long serialVersionUID = -5537657771138360838L;
private final Class<T> desiredType;
/**
* Construct an instance.
*
* @param desiredType The type of the value that we're performing the
function on.
*/
public CustomScalarFunction(Class<T> desiredType) {
this.desiredType = desiredType;
}
public T eval(T value) {
return value;
}
@Override
public TypeInformation<?> getResultType(Class<?>[] signature) {
return TypeInformation.of(desiredType);
}
@Override
public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
return new TypeInformation<?>[]{
TypeInformation.of(desiredType)
};
}
}
--
Morrisa Brenner
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com>
[image: Klaviyo Logo]
Re: Generic return type on a user-defined scalar function
Posted by Morrisa Brenner <mo...@klaviyo.com>.
Hi JingsongLee and Timo,
Thanks for taking a look and for the feedback!
All the best,
Morrisa
Morrisa Brenner
Software Engineer
━━
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com/>
> On May 21, 2019, at 12:10 AM, JingsongLee <lz...@aliyun.com> wrote:
>
> Hi Morrisa:
>
> It seems that flink planner not support return Object(or generic, like you say, type erasure) in ScalarFunction.
> In ScalarFunctionCallGen:
> val functionCallCode =
> s"""
> |${parameters.map(_.code).mkString("\n")}
> |$resultTypeTerm $resultTerm = $functionReference.eval(
> | ${parameters.map(_.resultTerm).mkString(", ")});
> |""".stripMargin
> There should be a coercive transformation to eval return value to support this situation.
> I have no ideas to bypass it. If you can modify the source code, you can change it to this way to support generic return type:
> val functionCallCode =
> s"""
> |${parameters.map(_.code).mkString("\n")}
> |$resultTypeTerm $resultTerm = ($resultTypeTerm) $functionReference.eval(
> | ${parameters.map(_.resultTerm).mkString(", ")});
> |""".stripMargin
>
> Best, JingsongLee
>
> ------------------------------------------------------------------
> From:Timo Walther <tw...@apache.org>
> Send Time:2019年5月20日(星期一) 23:03
> To:user <us...@flink.apache.org>
> Subject:Re: Generic return type on a user-defined scalar function
>
> Hi Morrisa,
>
> usually, this means that you class is not recognized as a POJO. Please check again the requirements of a POJO: Default constructor, getters and setters for every field etc. You can use org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class is a POJO or not.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> Am 16.05.19 um 23:18 schrieb Morrisa Brenner:
> Hi Flink folks,
>
> In a Flink job using the SQL API that I’m working on, I have a custom POJO data type with a generic field, and I would like to be able to call a user-defined function on this field. I included a similar function below with the business logic stubbed out, but this example has the return type I'm looking for.
>
> I have no issues using custom functions of this type when they're used in a select statement and the `getResultType` method is excluded from the user-defined function class, but I am unable to get the type information to resolve correctly in contexts like order by and group by statements. It still doesn't work even if the `getResultType` method defines the specific type for a given object explicitly because the job compiler within Flink seems to be assuming the return type from the `eval` method is just an Object (type erasure...), and it fails to generate the object code because it's detecting invalid casts to the desired output type. Without the `getResultType` method, it just fails to detect type entirely. This seems to be fine when it's just a select, but if I try to make it do any operation (like group by) I get the following error: "org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key."
>
> Does anyone know if there's a way to get Flink to pay attention to the type information from `getResultType` when compiling the `eval` method so that the types work out? Or another way to work around the type erasure on the eval method without defining explicit user-defined function classes for each type?
>
> Thanks for your help!
>
> Morrisa
>
>
>
> Code snippet:
>
>
> package flink_generics_testing;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.table.functions.ScalarFunction;
>
> /**
> * Reads custom values from a table and performs a function on those values.
> * T should be able to be a String, long, float, boolean, or Date
> *
> * @param <T> The expected type of the table column values.
> */
> public class CustomScalarFunction<T> extends ScalarFunction {
>
> private static final long serialVersionUID = -5537657771138360838L;
>
> private final Class<T> desiredType;
>
> /**
> * Construct an instance.
> *
> * @param desiredType The type of the value that we're performing the function on.
> */
> public CustomScalarFunction(Class<T> desiredType) {
> this.desiredType = desiredType;
> }
>
> public T eval(T value) {
> return value;
> }
>
> @Override
> public TypeInformation<?> getResultType(Class<?>[] signature) {
> return TypeInformation.of(desiredType);
> }
>
> @Override
> public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
> return new TypeInformation<?>[]{
> TypeInformation.of(desiredType)
> };
> }
> }
>
>
> --
> Morrisa Brenner
> Software Engineer
>
> 225 Franklin St, Boston, MA 02110
> klaviyo.com <https://www.klaviyo.com/>
>
>
Re: Generic return type on a user-defined scalar function
Posted by JingsongLee <lz...@aliyun.com>.
Hi Morrisa:
It seems that flink planner not support return Object(or generic, like you say, type erasure) in ScalarFunction.
In ScalarFunctionCallGen:
val functionCallCode =
s"""
|${parameters.map(_.code).mkString("\n")}
|$resultTypeTerm $resultTerm = $functionReference.eval(
| ${parameters.map(_.resultTerm).mkString(", ")});
|""".stripMargin
There should be a coercive transformation to eval return value to support this situation.
I have no ideas to bypass it. If you can modify the source code, you can change it to this way to support generic return type:
val functionCallCode =
s"""
|${parameters.map(_.code).mkString("\n")}
|$resultTypeTerm $resultTerm = ($resultTypeTerm) $functionReference.eval(
| ${parameters.map(_.resultTerm).mkString(", ")});
|""".stripMargin
Best, JingsongLee
------------------------------------------------------------------
From:Timo Walther <tw...@apache.org>
Send Time:2019年5月20日(星期一) 23:03
To:user <us...@flink.apache.org>
Subject:Re: Generic return type on a user-defined scalar function
Hi Morrisa,
usually, this means that you class is not recognized as a POJO. Please check again the requirements of a POJO: Default constructor, getters and setters for every field etc. You can use org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class is a POJO or not.
I hope this helps.
Regards,
Timo
Am 16.05.19 um 23:18 schrieb Morrisa Brenner:
Hi Flink folks,
In a Flink job using the SQL API that I’m working on, I have a custom POJO data type with a generic field, and I would like to be able to call a user-defined function on this field. I included a similar function below with the business logic stubbed out, but this example has the return type I'm looking for.
I have no issues using custom functions of this type when they're used in a select statement and the `getResultType` method is excluded from the user-defined function class, but I am unable to get the type information to resolve correctly in contexts like order by and group by statements. It still doesn't work even if the `getResultType` method defines the specific type for a given object explicitly because the job compiler within Flink seems to be assuming the return type from the `eval` method is just an Object (type erasure...), and it fails to generate the object code because it's detecting invalid casts to the desired output type. Without the `getResultType` method, it just fails to detect type entirely. This seems to be fine when it's just a select, but if I try to make it do any operation (like group by) I get the following error: "org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key."
Does anyone know if there's a way to get Flink to pay attention to the type information from `getResultType` when compiling the `eval` method so that the types work out? Or another way to work around the type erasure on the eval method without defining explicit user-defined function classes for each type?
Thanks for your help!
Morrisa
Code snippet:
package flink_generics_testing;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.ScalarFunction;
/**
* Reads custom values from a table and performs a function on those values.
* T should be able to be a String, long, float, boolean, or Date
*
* @param <T> The expected type of the table column values.
*/
public class CustomScalarFunction<T> extends ScalarFunction {
private static final long serialVersionUID = -5537657771138360838L;
private final Class<T> desiredType;
/**
* Construct an instance.
*
* @param desiredType The type of the value that we're performing the function on.
*/
public CustomScalarFunction(Class<T> desiredType) {
this.desiredType = desiredType;
}
public T eval(T value) {
return value;
}
@Override
public TypeInformation<?> getResultType(Class<?>[] signature) {
return TypeInformation.of(desiredType);
}
@Override
public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
return new TypeInformation<?>[]{
TypeInformation.of(desiredType)
};
}
}
--
Morrisa Brenner
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
[Klaviyo
Logo]
Re: Generic return type on a user-defined scalar function
Posted by Timo Walther <tw...@apache.org>.
Hi Morrisa,
usually, this means that you class is not recognized as a POJO. Please
check again the requirements of a POJO: Default constructor, getters and
setters for every field etc. You can use
org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your
class is a POJO or not.
I hope this helps.
Regards,
Timo
Am 16.05.19 um 23:18 schrieb Morrisa Brenner:
>
> Hi Flink folks,
>
>
> In a Flink job using the SQL API that I’m working on, I have a custom
> POJO data type with a generic field, and I would like to be able to
> call a user-defined function on this field.I included a similar
> function below with the business logic stubbed out, but this example
> has the return type I'm looking for.
>
>
> I have no issues using custom functions of this type when they're used
> in a select statement and the `getResultType` method is excluded from
> the user-defined function class, but I am unable to get the type
> information to resolve correctly in contexts like order by and group
> by statements. It still doesn't work even if the `getResultType`
> method defines the specific type for a given object explicitly because
> the job compiler within Flink seems to be assuming the return type
> from the `eval` method is just an Object (type erasure...), and it
> fails to generate the object code because it's detecting invalid casts
> to the desired output type. Without the `getResultType` method, it
> just fails to detect type entirely. This seems to be fine when it's
> just a select, but if I try to make it do any operation (like group
> by) I get the following error:
> "org.apache.flink.api.common.InvalidProgramException: This type
> (GenericType<java.lang.Object>) cannot be used as key."
>
>
> Does anyone know if there's a way to get Flink to pay attention to the
> type information from `getResultType` when compiling the `eval` method
> so that the types work out? Or another way to work around the type
> erasure on the eval method without defining explicit user-defined
> function classes for each type?
>
>
> Thanks for your help!
>
>
> Morrisa
>
>
>
>
> Code snippet:
>
>
>
> package flink_generics_testing;
>
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
>
> import org.apache.flink.table.functions.ScalarFunction;
>
>
> /**
>
> * Reads custom values from a table and performs a function on those
> values.
>
> * T should be able to be a String, long, float, boolean, or Date
>
> *
>
> * @param <T> The expected type of the table column values.
>
> */
>
> public class CustomScalarFunction<T> extends ScalarFunction {
>
>
> private static final long serialVersionUID = -5537657771138360838L;
>
>
> private final Class<T> desiredType;
>
>
> /**
>
> * Construct an instance.
>
> *
>
> * @param desiredType The type of the value that we're performing
> the function on.
>
> */
>
> public CustomScalarFunction(Class<T> desiredType) {
>
> this.desiredType = desiredType;
>
> }
>
> public T eval(T value) {
>
> return value;
>
> }
>
>
> @Override
>
> public TypeInformation<?> getResultType(Class<?>[] signature) {
>
> return TypeInformation.of(desiredType);
>
> }
>
>
> @Override
>
> public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
>
> return new TypeInformation<?>[]{
>
> TypeInformation.of(desiredType)
>
> };
>
> }
>
> }
>
>
>
> --
> Morrisa Brenner
> Software Engineer
>
> 225 Franklin St, Boston, MA 02110
> klaviyo.com <https://www.klaviyo.com>
> Klaviyo Logo
>