You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yichao Yang (Jira)" <ji...@apache.org> on 2022/05/26 09:35:00 UTC
[jira] [Created] (FLINK-27799) Version 1.13.5 is not compatible with version 1.10 UDF
Yichao Yang created FLINK-27799:
-----------------------------------
Summary: Version 1.13.5 is not compatible with version 1.10 UDF
Key: FLINK-27799
URL: https://issues.apache.org/jira/browse/FLINK-27799
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.13.5
Reporter: Yichao Yang
Flink 1.10 Version,The following code will work
{code:java}
// UDF
public class SetStringUDF extends ScalarFunction {
// @DataTypeHint("RAW")
public Set<String> eval(String input) {
return Sets.newHashSet(input, input + "_1", input + "_2");
}
@Override
public TypeInformation<?> getResultType(Class<?>[] signature) {
return TypeInformation.of(new TypeHint<Set<String>>() {
});
}
}
public class GetSetValue extends ScalarFunction {
public String eval(Set<String> set) {
return set.iterator().next();
}
}
StreamTableEnvironment.createFunction("set_string", SetStringUDF.class); StreamTableEnvironment.createFunction("get_set_value", GetSetValue.class);
CREATE TABLE Orders (
order_id BIGINT NOT NULL,
name STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.name.length' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '10'
);CREATE TABLE target_table (
order_id BIGINT NOT NULL,
name STRING,
row_time timestamp(3),
i STRING
) WITH (
'connector' = 'print'
);
INSERT INTO target_table
SELECT *, cast(get_set_value(set_string(name)) as string) as i
FROM Orders{code}
but in Flink 1.13.5,it will throw exception like:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'java.util.Set<java.lang.String>'. Interpreting it as a structured type was also not successful.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
... 36 more
Caused by: org.apache.flink.table.api.ValidationException: Class 'java.util.Set' must not be abstract.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
at org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:164)
at org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:479)
at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
... 37 more {code}
I have to change my UDF to fix this problem.
{code:java}
public class GetSetValue extends ScalarFunction {
public String eval(@DataTypeHint("RAW") Object set) {
Set<String> s = (Set<String>) set;
return s.iterator().next();
}
}
public class SetStringUDF extends ScalarFunction {
@DataTypeHint("RAW")
public Object eval(String input) {
return Sets.newHashSet(input, input + "_1", input + "_2");
}
}
{code}
I have two questions:
# At present, is there a way to be compatible with this problem without changing the code?
# If 1 is not。We need fix all of the UDFs,it will be a lot work to do. Can there be a plan to complete compatibility in the future
--
This message was sent by Atlassian Jira
(v8.20.7#820007)