You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by lk_hadoop <lk...@163.com> on 2023/01/03 01:15:22 UTC
WindowDeduplicate didn't write result to sink effectiveness as expected
hi,all
I'm using flink 1.15.3 , I want use WindowDeduplicate to drop duplicate record.
my stream table schema is :
(
`userName` STRING,
`userMAC` STRING,
`bssid` STRING,
`ssid` STRING,
`apName` STRING,
`radioID` STRING,
`vlanid` STRING,
`action` STRING,
`ddate` STRING,
`dtime` STRING,
`rawValue` STRING,
`region` STRING,
`eventTime` TIMESTAMP(3) *ROWTIME*
)
when I try code below :
TableResult result = tEnv.executeSql("select * from TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))");
result.print();
I can see the print result on the console each time when the checkpoint complete, and checkpoint size almost not grow.
But when I try to WindowDeduplicate with code :
TableResult result = tEnv.executeSql("select * from " +
"(select ddate, userMAC, bssid, dtime, userName, apName, action, ssid, rawValue, ROW_NUMBER() OVER (" +
"PARTITION BY window_start, window_end, userName ORDER BY eventTime DESC" +
") as row_num from " +
"TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '5' SECONDS))" +
") where row_num <= 1");
result.print();
I can't see print result from console , but from the log I can see the checkpoint size is growing after each time checkpoint trigger.
I want to know how can I make the records write to sink afer each time checkpoint Complete. Thanks.