You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Xinyi Yan (Jira)" <ji...@apache.org> on 2022/11/02 21:37:00 UTC

[jira] [Created] (FLINK-29855) UDF randomly processed input data twice

Xinyi Yan created FLINK-29855:
---------------------------------

             Summary: UDF randomly processed input data twice 
                 Key: FLINK-29855
                 URL: https://issues.apache.org/jira/browse/FLINK-29855
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.14.4
            Reporter: Xinyi Yan
         Attachments: IntInputUdf.java, SpendReport.java, example.log

 

To reproduce the issue:
 # create a datagen table with a single column int type of id.
 # create a UDF that only mod input data with logging statements.
 # create a print table that prints the results.
 # insert data into the print table with UDF(input id column) execution from the datagen table.

The logging shows that some of the data have been processed twice, which is not expected I guess? This will totally change the behavior of the UDF if the data has been processed twice. I also attached main and UDF classes, as well as the logging file for additional info.

 

DDL

 
{code:java}
public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        
        TableEnvironment tEnv = TableEnvironment.create(settings);
        
        tEnv.executeSql("CREATE FUNCTION IntInputUdf AS 'org.apache.flink.playgrounds.spendreport.IntInputUdf'");        tEnv.executeSql("CREATE TABLE datagenTable (\n" +
                "    id  INT\n" +
                ") WITH (\n" +
                "    'connector' = 'datagen',\n" +
                "    'number-of-rows' = '100',\n" +
                "    'rows-per-second' = '1'\n" +
                ")");        tEnv.executeSql("CREATE TABLE print_table (\n" +
                "    id_in_bytes  VARBINARY,\n" +
                "    id  INT\n" +
                ") WITH (\n" +
                "    'connector' = 'print'\n" +
                ")");        tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL");
    }  {code}
 

UDF

 
{code:java}
public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer intputNum) {
    byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
    if (intputNum % 2 == 0) {
      LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### ", results, intputNum);
      return results;
    }
    LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
    return null;
  } {code}
output

 

 
{code:java}
2022-11-02 13:38:56,765 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:38:56,766 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:38:57,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:38:57,763 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:38:58,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:38:58,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:38:59,759 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.2022-11-02 13:39:00,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.2022-11-02 13:39:01,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:39:01,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:39:02,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:39:02,762 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:39:03,758 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753.2022-11-02 13:39:04,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.2022-11-02 13:39:05,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449.2022-11-02 13:39:06,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.2022-11-02 13:39:07,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:39:07,763 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:39:08,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417.2022-11-02 13:39:09,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.2022-11-02 13:39:10,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843.2022-11-02 13:39:11,759 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.2022-11-02 13:39:12,759 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835. {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)