You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yichao Yang (Jira)" <ji...@apache.org> on 2022/05/26 09:35:00 UTC

[jira] [Created] (FLINK-27799) Version 1.13.5 is not compatible with version 1.10 UDF

Yichao Yang created FLINK-27799:
-----------------------------------

             Summary: Version 1.13.5 is not compatible with version 1.10 UDF
                 Key: FLINK-27799
                 URL: https://issues.apache.org/jira/browse/FLINK-27799
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.13.5
            Reporter: Yichao Yang


Flink 1.10 Version,The following code will work

 
{code:java}
// UDF
public class SetStringUDF extends ScalarFunction {

//    @DataTypeHint("RAW")
    public Set<String> eval(String input) {
        return Sets.newHashSet(input, input + "_1", input + "_2");
    }

    @Override
    public TypeInformation<?> getResultType(Class<?>[] signature) {
        return TypeInformation.of(new TypeHint<Set<String>>() {
        });
    }

}

public class GetSetValue extends ScalarFunction {

    public String eval(Set<String> set) {
        return set.iterator().next();
    }
}


StreamTableEnvironment.createFunction("set_string", SetStringUDF.class); StreamTableEnvironment.createFunction("get_set_value", GetSetValue.class);

CREATE TABLE Orders (
    order_id BIGINT NOT NULL,
    name STRING,
    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.name.length' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '10'
);CREATE TABLE target_table (
    order_id BIGINT NOT NULL,
    name STRING,
    row_time timestamp(3),
    i STRING
) WITH (
  'connector' = 'print'
);
INSERT INTO target_table
SELECT *, cast(get_set_value(set_string(name)) as string) as i
FROM Orders{code}
but in Flink 1.13.5,it will throw exception like:

 

 
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'java.util.Set<java.lang.String>'. Interpreting it as a structured type was also not successful.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
    ... 36 more
Caused by: org.apache.flink.table.api.ValidationException: Class 'java.util.Set' must not be abstract.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
    at org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:164)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:479)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
    ... 37 more {code}
 

 

I have to change my UDF to fix this problem.

 
{code:java}
public class GetSetValue extends ScalarFunction {

    public String eval(@DataTypeHint("RAW") Object set) {
        
        Set<String> s = (Set<String>) set;
        
        return s.iterator().next();
    }
}

public class SetStringUDF extends ScalarFunction {

    @DataTypeHint("RAW")
    public Object eval(String input) {
        return Sets.newHashSet(input, input + "_1", input + "_2");
    }

}
 {code}
 

 

I have two questions:
 # At present, is there a way to be compatible with this problem without changing the code?
 # If 1 is not。We need fix all of the UDFs,it will be a lot work to do. Can there be a plan to complete compatibility in the future



--
This message was sent by Atlassian Jira
(v8.20.7#820007)