You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Xinyi Yan <ya...@apache.org> on 2022/11/02 22:59:20 UTC

Question about UDF randomly processed input row twice

Hi all,
I found a weird UDF behavior, and it's a single thread that processes UDF
twice, see FLINK-29855 <https://issues.apache.org/jira/browse/FLINK-29855> for
more details. Basically, I created a datagen table with a random integer (1
row per second) and passed this value into the UDF. Inside UDF, I just
simply mod the input number, convert the integer to a byte array, and then
logged it for debugging purposes. As you can see, some of the rows have
been called twice inside UDF. Not sure if this duplicated UDF call is
expected, and not sure why it doesn't constantly produce duplicated calls
for all rows. In any case of concern about the local env setup, I only have
1 task manager and 1 task slot in my local Flink cluster.

Thanks!

UDF

public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer
intputNum) {
    byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
    if (intputNum % 2 == 0) {
      LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ###
### duplicated call??? ### DEBUG  ### ### ", results, intputNum);
      return results;
    }
    LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
    return null;
  }


Main class DDLs

tEnv.executeSql("CREATE FUNCTION IntInputUdf AS
'org.apache.flink.playgrounds.spendreport.IntInputUdf'");
tEnv.executeSql("CREATE TABLE datagenTable (\n" +
                "    id  INT\n" +
                ") WITH (\n" +
                "    'connector' = 'datagen',\n" +
                "    'number-of-rows' = '100',\n" +
                "    'rows-per-second' = '1'\n" +
                ")");
tEnv.executeSql("CREATE TABLE print_table (\n" +
                "    id_in_bytes  VARBINARY,\n" +
                "    id  INT\n" +
                ") WITH (\n" +
                "    'connector' = 'print'\n" +
                ")");
tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT
IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET
WHERE ET.`id_in_bytes` IS NOT NULL");


Logging

2022-11-02 13:38:58,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
*1506311954*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:38:58,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
*1506311954*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:38:59,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num
*-1800690437*.
2022-11-02 13:39:00,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num
*1428877483*.
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
*-1794263686*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
*-1794263686*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###

Re: Question about UDF randomly processed input row twice

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Thanks for your explanation. The execute plan for the sql `INSERT INTO print_table SELECT * FROM ( SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL` is : 
` 
StreamPhysicalSink(table=[default_catalog.default_database.print_table], fields=[id_in_bytes, id]) 
StreamPhysicalCalc(select=[RandomUdf(id) AS id_in_bytes, id], where=[IS NOT NULL(RandomUdf(id))]) 
StreamPhysicalTableSourceScan(table=[[default_catalog, default_database, datagenTable]], fields=[id]) 
` 
and from the plan, we can see it'll call the udf for twice in the StreamPhysicalCalc, as of result of which, it seems the one row will be processed for twice. 

Best regards, 
Yuxia 


发件人: "Xinyi Yan" <ya...@apache.org> 
收件人: "yuxia" <lu...@alumni.sjtu.edu.cn> 
抄送: "User" <us...@flink.apache.org> 
发送时间: 星期五, 2022年 11 月 04日 上午 5:28:30 
主题: Re: Question about UDF randomly processed input row twice 

Ok. The datagen with sequence option can produce this issue easily, and it also resulted in an incorrect result. I have a sequence generated by datagen that starts from 1 to 5 and let the UDF randomly either return null or bytes. Surprisingly, not only the UDF has been executed twice but also the where clause did not handle the ` IS NOT NULL `. This is a big shock from my side, the where clause `IS NOT NULL` condition is a fundamental SQL feature and it should not break. I have updated my finding in [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] , and here are the repro steps: 

Query: 
INSERT INTO print_table 
     SELECT * FROM ( 
           SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable 
     ) 
AS ET WHERE ET.`id_in_bytes` IS NOT NULL " 

Result: 
+I[ null , 1] 
+I[[50], 2] 
+I[ null , 4] 


UDF 
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer intputNum) { byte [] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); int randomNumber = (( int ) ( Math .random() * (10 - 1))) + 1;
    LOG.info( "[*][*][*] input num is {} and random number is {}. [*][*][*]" , intputNum, randomNumber); if (randomNumber % 2 == 0) {
      LOG.info( "### ### input bytes {} and num {}.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### " , results, intputNum); return results;
    }
    LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); return null ;
  } 

Log: 
2022-11-02 13:38:56,765 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:56,766 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:57,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:57,763 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:59,759 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.
2022-11-02 13:39:00,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.
2022-11-02 13:39:01,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:01,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:02,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:02,762 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:03,758 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753.
2022-11-02 13:39:04,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.
2022-11-02 13:39:05,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449.
2022-11-02 13:39:06,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.
2022-11-02 13:39:07,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:07,763 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:08,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417.
2022-11-02 13:39:09,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.
2022-11-02 13:39:10,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843.
2022-11-02 13:39:11,759 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.
2022-11-02 13:39:12,759 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835. 

On Thu, Nov 3, 2022 at 3:04 AM yuxia < [ mailto:luoyuxia@alumni.sjtu.edu.cn | luoyuxia@alumni.sjtu.edu.cn ] > wrote: 



The dategen may produce rows with same values. 

From my side, in Flink, the udf shouldn't process one row for twice, otherwise, it should be a critical bug. 

Best regards, 
Yuxia 


发件人: "Xinyi Yan" < [ mailto:yanxinyi@apache.org | yanxinyi@apache.org ] > 
收件人: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
发送时间: 星期四, 2022年 11 月 03日 上午 6:59:20 
主题: Question about UDF randomly processed input row twice 

Hi all, 
I found a weird UDF behavior, and it's a single thread that processes UDF twice, see [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] for more details. Basically, I created a datagen table with a random integer (1 row per second) and passed this value into the UDF. Inside UDF, I just simply mod the input number, convert the integer to a byte array, and then logged it for debugging purposes. As you can see, some of the rows have been called twice inside UDF. Not sure if this duplicated UDF call is expected, and not sure why it doesn't constantly produce duplicated calls for all rows. In any case of concern about the local env setup, I only have 1 task manager and 1 task slot in my local Flink cluster. 

Thanks! 

UDF 
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer intputNum) { byte [] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); if (intputNum % 2 == 0) {
      LOG.info( "### ### input bytes {} and num {}.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### " , results, intputNum); return results;
    }
    LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); return null ;
  } 

Main class DDLs 
tEnv.executeSql( "CREATE FUNCTION IntInputUdf AS 'org.apache.flink.playgrounds.spendreport.IntInputUdf' " );        tEnv.executeSql( "CREATE TABLE datagenTable (\n" + "    id  INT\n" + ") WITH (\n" + " 'connector' = 'datagen' ,\n" + " 'number-of-rows' = '100' ,\n" + " 'rows-per-second' = '1' \n" + ")" );        
tEnv.executeSql( "CREATE TABLE print_table (\n" + "    id_in_bytes  VARBINARY,\n" + "    id  INT\n" + ") WITH (\n" + " 'connector' = 'print' \n" + ")" );        
tEnv.executeSql( "INSERT INTO print_table SELECT * FROM ( SELECT IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL" ); 

Logging 
2022-11-02 13:38:58,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:59,759 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437 .
2022-11-02 13:39:00,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483 .
2022-11-02 13:39:01,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:01,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 





Re: Question about UDF randomly processed input row twice

Posted by Xinyi Yan <ya...@apache.org>.
Ok. The datagen with sequence option can produce this issue easily, and it
also resulted in an incorrect result. I have a sequence generated by
datagen that starts from 1 to 5 and let the UDF randomly either return null
or bytes. Surprisingly, not only the UDF has been executed twice but also
the where clause did not handle the `*IS NOT NULL*`. This is a big shock
from my side, the where clause `IS NOT NULL` condition is a fundamental SQL
feature and it should not break. I have updated my finding in FLINK-29855
<https://issues.apache.org/jira/browse/FLINK-29855>, and here are the repro
steps:

Query:

INSERT INTO print_table
     SELECT * FROM (
           SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable
     )
AS ET WHERE ET.`id_in_bytes` *IS NOT NULL*"


Result:

+I[null, 1]
+I[[50], 2]
+I[null, 4]

UDF

public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer
intputNum) {
    byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
    int randomNumber = ((int) (Math.random() * (10 - 1))) + 1;
    LOG.info("[*][*][*] input num is {} and random number is {}.
[*][*][*]", intputNum, randomNumber);
    if (randomNumber % 2 == 0) {
      LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ###
### duplicated call??? ### DEBUG  ### ### ", results, intputNum);
      return results;
    }
    LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
    return null;
  }


Log:

2022-11-02 13:38:56,765 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num
-1324836502.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:56,766 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num
-1324836502.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:57,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num
1085456542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:57,763 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num
1085456542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:58,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
1506311954.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:58,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
1506311954.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:59,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num
-1800690437.
2022-11-02 13:39:00,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num
1428877483.
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
-1794263686.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
-1794263686.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:02,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num
-1166898542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:02,762 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num
-1166898542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:03,758 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num
-1663515753.
2022-11-02 13:39:04,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num
-45534429.
2022-11-02 13:39:05,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num
127072449.
2022-11-02 13:39:06,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num
-453705607.
2022-11-02 13:39:07,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num
-1095908326.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:07,763 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num
-1095908326.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:08,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num
-1627597417.
2022-11-02 13:39:09,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num
596520501.
2022-11-02 13:39:10,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num
1361162843.
2022-11-02 13:39:11,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num
2048051791.
2022-11-02 13:39:12,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num
-306603835.


On Thu, Nov 3, 2022 at 3:04 AM yuxia <lu...@alumni.sjtu.edu.cn> wrote:

> The dategen may produce rows with same values.
>
> From my side, in Flink, the udf shouldn't process one row for twice,
> otherwise, it should be a critical bug.
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"Xinyi Yan" <ya...@apache.org>
> *收件人: *"User" <us...@flink.apache.org>
> *发送时间: *星期四, 2022年 11 月 03日 上午 6:59:20
> *主题: *Question about UDF randomly processed input row twice
>
> Hi all,
> I found a weird UDF behavior, and it's a single thread that processes UDF
> twice, see FLINK-29855 <https://issues.apache.org/jira/browse/FLINK-29855> for
> more details. Basically, I created a datagen table with a random integer (1
> row per second) and passed this value into the UDF. Inside UDF, I just
> simply mod the input number, convert the integer to a byte array, and then
> logged it for debugging purposes. As you can see, some of the rows have
> been called twice inside UDF. Not sure if this duplicated UDF call is
> expected, and not sure why it doesn't constantly produce duplicated calls
> for all rows. In any case of concern about the local env setup, I only have
> 1 task manager and 1 task slot in my local Flink cluster.
>
> Thanks!
>
> UDF
>
> public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer intputNum) {
>     byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
>     if (intputNum % 2 == 0) {
>       LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### ", results, intputNum);
>       return results;
>     }
>     LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
>     return null;
>   }
>
>
> Main class DDLs
>
> tEnv.executeSql("CREATE FUNCTION IntInputUdf AS 'org.apache.flink.playgrounds.spendreport.IntInputUdf'");        tEnv.executeSql("CREATE TABLE datagenTable (\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'datagen',\n" +
>                 "    'number-of-rows' = '100',\n" +
>                 "    'rows-per-second' = '1'\n" +
>                 ")");
> tEnv.executeSql("CREATE TABLE print_table (\n" +
>                 "    id_in_bytes  VARBINARY,\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'print'\n" +
>                 ")");
> tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL");
>
>
> Logging
>
> 2022-11-02 13:38:58,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num *1506311954*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ###
> 2022-11-02 13:38:58,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num *1506311954*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ###
> 2022-11-02 13:38:59,759 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num *-1800690437*.
> 2022-11-02 13:39:00,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num *1428877483*.
> 2022-11-02 13:39:01,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num *-1794263686*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ###
> 2022-11-02 13:39:01,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num *-1794263686*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ###
>
>
>

Re: Question about UDF randomly processed input row twice

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
The dategen may produce rows with same values. 

From my side, in Flink, the udf shouldn't process one row for twice, otherwise, it should be a critical bug. 

Best regards, 
Yuxia 


发件人: "Xinyi Yan" <ya...@apache.org> 
收件人: "User" <us...@flink.apache.org> 
发送时间: 星期四, 2022年 11 月 03日 上午 6:59:20 
主题: Question about UDF randomly processed input row twice 

Hi all, 
I found a weird UDF behavior, and it's a single thread that processes UDF twice, see [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] for more details. Basically, I created a datagen table with a random integer (1 row per second) and passed this value into the UDF. Inside UDF, I just simply mod the input number, convert the integer to a byte array, and then logged it for debugging purposes. As you can see, some of the rows have been called twice inside UDF. Not sure if this duplicated UDF call is expected, and not sure why it doesn't constantly produce duplicated calls for all rows. In any case of concern about the local env setup, I only have 1 task manager and 1 task slot in my local Flink cluster. 

Thanks! 

UDF 
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer intputNum) { byte [] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); if (intputNum % 2 == 0) {
      LOG.info( "### ### input bytes {} and num {}.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### " , results, intputNum); return results;
    }
    LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); return null ;
  } 

Main class DDLs 
tEnv.executeSql( "CREATE FUNCTION IntInputUdf AS 'org.apache.flink.playgrounds.spendreport.IntInputUdf' " );        tEnv.executeSql( "CREATE TABLE datagenTable (\n" + "    id  INT\n" + ") WITH (\n" + " 'connector' = 'datagen' ,\n" + " 'number-of-rows' = '100' ,\n" + " 'rows-per-second' = '1' \n" + ")" );        
tEnv.executeSql( "CREATE TABLE print_table (\n" + "    id_in_bytes  VARBINARY,\n" + "    id  INT\n" + ") WITH (\n" + " 'connector' = 'print' \n" + ")" );        
tEnv.executeSql( "INSERT INTO print_table SELECT * FROM ( SELECT IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL" ); 

Logging 
2022-11-02 13:38:58,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:59,759 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437 .
2022-11-02 13:39:00,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483 .
2022-11-02 13:39:01,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:01,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ###