You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Xinyi Yan <ya...@apache.org> on 2022/11/02 22:59:20 UTC
Question about UDF randomly processed input row twice
Hi all,
I found a weird UDF behavior, and it's a single thread that processes UDF
twice, see FLINK-29855 <https://issues.apache.org/jira/browse/FLINK-29855> for
more details. Basically, I created a datagen table with a random integer (1
row per second) and passed this value into the UDF. Inside UDF, I just
simply mod the input number, convert the integer to a byte array, and then
logged it for debugging purposes. As you can see, some of the rows have
been called twice inside UDF. Not sure if this duplicated UDF call is
expected, and not sure why it doesn't constantly produce duplicated calls
for all rows. In any case of concern about the local env setup, I only have
1 task manager and 1 task slot in my local Flink cluster.
Thanks!
UDF
public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer
intputNum) {
byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
if (intputNum % 2 == 0) {
LOG.info("### ### input bytes {} and num {}. ### ### DEBUG ###
### duplicated call??? ### DEBUG ### ### ", results, intputNum);
return results;
}
LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
return null;
}
Main class DDLs
tEnv.executeSql("CREATE FUNCTION IntInputUdf AS
'org.apache.flink.playgrounds.spendreport.IntInputUdf'");
tEnv.executeSql("CREATE TABLE datagenTable (\n" +
" id INT\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'number-of-rows' = '100',\n" +
" 'rows-per-second' = '1'\n" +
")");
tEnv.executeSql("CREATE TABLE print_table (\n" +
" id_in_bytes VARBINARY,\n" +
" id INT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")");
tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT
IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET
WHERE ET.`id_in_bytes` IS NOT NULL");
Logging
2022-11-02 13:38:58,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
*1506311954*. ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:38:58,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
*1506311954*. ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:38:59,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num
*-1800690437*.
2022-11-02 13:39:00,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num
*1428877483*.
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
*-1794263686*. ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
*-1794263686*. ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
Re: Question about UDF randomly processed input row twice
Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Thanks for your explanation. The execute plan for the sql `INSERT INTO print_table SELECT * FROM ( SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL` is :
`
StreamPhysicalSink(table=[default_catalog.default_database.print_table], fields=[id_in_bytes, id])
StreamPhysicalCalc(select=[RandomUdf(id) AS id_in_bytes, id], where=[IS NOT NULL(RandomUdf(id))])
StreamPhysicalTableSourceScan(table=[[default_catalog, default_database, datagenTable]], fields=[id])
`
and from the plan, we can see it'll call the udf for twice in the StreamPhysicalCalc, as of result of which, it seems the one row will be processed for twice.
Best regards,
Yuxia
发件人: "Xinyi Yan" <ya...@apache.org>
收件人: "yuxia" <lu...@alumni.sjtu.edu.cn>
抄送: "User" <us...@flink.apache.org>
发送时间: 星期五, 2022年 11 月 04日 上午 5:28:30
主题: Re: Question about UDF randomly processed input row twice
Ok. The datagen with sequence option can produce this issue easily, and it also resulted in an incorrect result. I have a sequence generated by datagen that starts from 1 to 5 and let the UDF randomly either return null or bytes. Surprisingly, not only the UDF has been executed twice but also the where clause did not handle the ` IS NOT NULL `. This is a big shock from my side, the where clause `IS NOT NULL` condition is a fundamental SQL feature and it should not break. I have updated my finding in [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] , and here are the repro steps:
Query:
INSERT INTO print_table
SELECT * FROM (
SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable
)
AS ET WHERE ET.`id_in_bytes` IS NOT NULL "
Result:
+I[ null , 1]
+I[[50], 2]
+I[ null , 4]
UDF
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer intputNum) { byte [] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); int randomNumber = (( int ) ( Math .random() * (10 - 1))) + 1;
LOG.info( "[*][*][*] input num is {} and random number is {}. [*][*][*]" , intputNum, randomNumber); if (randomNumber % 2 == 0) {
LOG.info( "### ### input bytes {} and num {}. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### " , results, intputNum); return results;
}
LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); return null ;
}
Log:
2022-11-02 13:38:56,765 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:38:56,766 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:38:57,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:38:57,763 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:38:58,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:38:58,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:38:59,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.
2022-11-02 13:39:00,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.
2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:39:02,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:39:02,762 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:39:03,758 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753.
2022-11-02 13:39:04,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.
2022-11-02 13:39:05,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449.
2022-11-02 13:39:06,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.
2022-11-02 13:39:07,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:39:07,763 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:39:08,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417.
2022-11-02 13:39:09,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.
2022-11-02 13:39:10,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843.
2022-11-02 13:39:11,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.
2022-11-02 13:39:12,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835.
On Thu, Nov 3, 2022 at 3:04 AM yuxia < [ mailto:luoyuxia@alumni.sjtu.edu.cn | luoyuxia@alumni.sjtu.edu.cn ] > wrote:
The dategen may produce rows with same values.
From my side, in Flink, the udf shouldn't process one row for twice, otherwise, it should be a critical bug.
Best regards,
Yuxia
发件人: "Xinyi Yan" < [ mailto:yanxinyi@apache.org | yanxinyi@apache.org ] >
收件人: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] >
发送时间: 星期四, 2022年 11 月 03日 上午 6:59:20
主题: Question about UDF randomly processed input row twice
Hi all,
I found a weird UDF behavior, and it's a single thread that processes UDF twice, see [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] for more details. Basically, I created a datagen table with a random integer (1 row per second) and passed this value into the UDF. Inside UDF, I just simply mod the input number, convert the integer to a byte array, and then logged it for debugging purposes. As you can see, some of the rows have been called twice inside UDF. Not sure if this duplicated UDF call is expected, and not sure why it doesn't constantly produce duplicated calls for all rows. In any case of concern about the local env setup, I only have 1 task manager and 1 task slot in my local Flink cluster.
Thanks!
UDF
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer intputNum) { byte [] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); if (intputNum % 2 == 0) {
LOG.info( "### ### input bytes {} and num {}. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### " , results, intputNum); return results;
}
LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); return null ;
}
Main class DDLs
tEnv.executeSql( "CREATE FUNCTION IntInputUdf AS 'org.apache.flink.playgrounds.spendreport.IntInputUdf' " ); tEnv.executeSql( "CREATE TABLE datagenTable (\n" + " id INT\n" + ") WITH (\n" + " 'connector' = 'datagen' ,\n" + " 'number-of-rows' = '100' ,\n" + " 'rows-per-second' = '1' \n" + ")" );
tEnv.executeSql( "CREATE TABLE print_table (\n" + " id_in_bytes VARBINARY,\n" + " id INT\n" + ") WITH (\n" + " 'connector' = 'print' \n" + ")" );
tEnv.executeSql( "INSERT INTO print_table SELECT * FROM ( SELECT IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL" );
Logging
2022-11-02 13:38:58,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:38:58,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:38:59,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437 .
2022-11-02 13:39:00,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483 .
2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
Re: Question about UDF randomly processed input row twice
Posted by Xinyi Yan <ya...@apache.org>.
Ok. The datagen with sequence option can produce this issue easily, and it
also resulted in an incorrect result. I have a sequence generated by
datagen that starts from 1 to 5 and let the UDF randomly either return null
or bytes. Surprisingly, not only the UDF has been executed twice but also
the where clause did not handle the `*IS NOT NULL*`. This is a big shock
from my side, the where clause `IS NOT NULL` condition is a fundamental SQL
feature and it should not break. I have updated my finding in FLINK-29855
<https://issues.apache.org/jira/browse/FLINK-29855>, and here are the repro
steps:
Query:
INSERT INTO print_table
SELECT * FROM (
SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable
)
AS ET WHERE ET.`id_in_bytes` *IS NOT NULL*"
Result:
+I[null, 1]
+I[[50], 2]
+I[null, 4]
UDF
public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer
intputNum) {
byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
int randomNumber = ((int) (Math.random() * (10 - 1))) + 1;
LOG.info("[*][*][*] input num is {} and random number is {}.
[*][*][*]", intputNum, randomNumber);
if (randomNumber % 2 == 0) {
LOG.info("### ### input bytes {} and num {}. ### ### DEBUG ###
### duplicated call??? ### DEBUG ### ### ", results, intputNum);
return results;
}
LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
return null;
}
Log:
2022-11-02 13:38:56,765 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num
-1324836502. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:38:56,766 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num
-1324836502. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:38:57,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num
1085456542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:38:57,763 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num
1085456542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:38:58,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
1506311954. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:38:58,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
1506311954. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:38:59,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num
-1800690437.
2022-11-02 13:39:00,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num
1428877483.
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
-1794263686. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
-1794263686. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:39:02,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num
-1166898542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:39:02,762 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num
-1166898542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:39:03,758 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num
-1663515753.
2022-11-02 13:39:04,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num
-45534429.
2022-11-02 13:39:05,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num
127072449.
2022-11-02 13:39:06,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num
-453705607.
2022-11-02 13:39:07,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num
-1095908326. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:39:07,763 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ###
### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num
-1095908326. ### ### DEBUG ### ### duplicated call??? ### DEBUG ###
###
2022-11-02 13:39:08,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num
-1627597417.
2022-11-02 13:39:09,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num
596520501.
2022-11-02 13:39:10,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num
1361162843.
2022-11-02 13:39:11,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num
2048051791.
2022-11-02 13:39:12,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ***
*** input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num
-306603835.
On Thu, Nov 3, 2022 at 3:04 AM yuxia <lu...@alumni.sjtu.edu.cn> wrote:
> The dategen may produce rows with same values.
>
> From my side, in Flink, the udf shouldn't process one row for twice,
> otherwise, it should be a critical bug.
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"Xinyi Yan" <ya...@apache.org>
> *收件人: *"User" <us...@flink.apache.org>
> *发送时间: *星期四, 2022年 11 月 03日 上午 6:59:20
> *主题: *Question about UDF randomly processed input row twice
>
> Hi all,
> I found a weird UDF behavior, and it's a single thread that processes UDF
> twice, see FLINK-29855 <https://issues.apache.org/jira/browse/FLINK-29855> for
> more details. Basically, I created a datagen table with a random integer (1
> row per second) and passed this value into the UDF. Inside UDF, I just
> simply mod the input number, convert the integer to a byte array, and then
> logged it for debugging purposes. As you can see, some of the rows have
> been called twice inside UDF. Not sure if this duplicated UDF call is
> expected, and not sure why it doesn't constantly produce duplicated calls
> for all rows. In any case of concern about the local env setup, I only have
> 1 task manager and 1 task slot in my local Flink cluster.
>
> Thanks!
>
> UDF
>
> public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer intputNum) {
> byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
> if (intputNum % 2 == 0) {
> LOG.info("### ### input bytes {} and num {}. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### ", results, intputNum);
> return results;
> }
> LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
> return null;
> }
>
>
> Main class DDLs
>
> tEnv.executeSql("CREATE FUNCTION IntInputUdf AS 'org.apache.flink.playgrounds.spendreport.IntInputUdf'"); tEnv.executeSql("CREATE TABLE datagenTable (\n" +
> " id INT\n" +
> ") WITH (\n" +
> " 'connector' = 'datagen',\n" +
> " 'number-of-rows' = '100',\n" +
> " 'rows-per-second' = '1'\n" +
> ")");
> tEnv.executeSql("CREATE TABLE print_table (\n" +
> " id_in_bytes VARBINARY,\n" +
> " id INT\n" +
> ") WITH (\n" +
> " 'connector' = 'print'\n" +
> ")");
> tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL");
>
>
> Logging
>
> 2022-11-02 13:38:58,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num *1506311954*. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:58,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num *1506311954*. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:59,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num *-1800690437*.
> 2022-11-02 13:39:00,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num *1428877483*.
> 2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num *-1794263686*. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num *-1794263686*. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
>
>
>
Re: Question about UDF randomly processed input row twice
Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
The dategen may produce rows with same values.
From my side, in Flink, the udf shouldn't process one row for twice, otherwise, it should be a critical bug.
Best regards,
Yuxia
发件人: "Xinyi Yan" <ya...@apache.org>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期四, 2022年 11 月 03日 上午 6:59:20
主题: Question about UDF randomly processed input row twice
Hi all,
I found a weird UDF behavior, and it's a single thread that processes UDF twice, see [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] for more details. Basically, I created a datagen table with a random integer (1 row per second) and passed this value into the UDF. Inside UDF, I just simply mod the input number, convert the integer to a byte array, and then logged it for debugging purposes. As you can see, some of the rows have been called twice inside UDF. Not sure if this duplicated UDF call is expected, and not sure why it doesn't constantly produce duplicated calls for all rows. In any case of concern about the local env setup, I only have 1 task manager and 1 task slot in my local Flink cluster.
Thanks!
UDF
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer intputNum) { byte [] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); if (intputNum % 2 == 0) {
LOG.info( "### ### input bytes {} and num {}. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### " , results, intputNum); return results;
}
LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); return null ;
}
Main class DDLs
tEnv.executeSql( "CREATE FUNCTION IntInputUdf AS 'org.apache.flink.playgrounds.spendreport.IntInputUdf' " ); tEnv.executeSql( "CREATE TABLE datagenTable (\n" + " id INT\n" + ") WITH (\n" + " 'connector' = 'datagen' ,\n" + " 'number-of-rows' = '100' ,\n" + " 'rows-per-second' = '1' \n" + ")" );
tEnv.executeSql( "CREATE TABLE print_table (\n" + " id_in_bytes VARBINARY,\n" + " id INT\n" + ") WITH (\n" + " 'connector' = 'print' \n" + ")" );
tEnv.executeSql( "INSERT INTO print_table SELECT * FROM ( SELECT IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL" );
Logging
2022-11-02 13:38:58,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:38:58,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:38:59,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437 .
2022-11-02 13:39:00,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483 .
2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###