You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by lk_hadoop <lk...@163.com> on 2023/01/03 01:15:22 UTC

WindowDeduplicate didn't write result to sink effectiveness as expected

hi,all 
   I'm using flink 1.15.3 , I want use WindowDeduplicate to drop duplicate record. 
   my stream table schema is :
(
  `userName` STRING,
  `userMAC` STRING,
  `bssid` STRING,
  `ssid` STRING,
  `apName` STRING,
  `radioID` STRING,
  `vlanid` STRING,
  `action` STRING,
  `ddate` STRING,
  `dtime` STRING,
  `rawValue` STRING,
  `region` STRING,
  `eventTime` TIMESTAMP(3) *ROWTIME*
)
 when I try code below :
TableResult result = tEnv.executeSql("select * from TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))");

result.print(); 


I can see the print result on the console each time when the checkpoint complete, and checkpoint size almost not grow.
But when I try to WindowDeduplicate with code :


   TableResult result = tEnv.executeSql("select * from " +
"(select ddate, userMAC, bssid, dtime, userName, apName, action, ssid, rawValue, ROW_NUMBER() OVER (" +
"PARTITION BY window_start, window_end, userName ORDER BY eventTime DESC" +
") as row_num from " +
"TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '5' SECONDS))" +
") where row_num <= 1");


result.print();


I can't see print result from console , but from the log I can see the checkpoint size is growing after each time checkpoint trigger.
I want to know how can I make the records write to sink afer each time checkpoint Complete. Thanks.