You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Vararu, Vadim" <va...@adswizz.com> on 2023/05/10 08:20:50 UTC
S3 Parquet files rolling on event not working because PartFileInfo.getSize() does not increase.
Hi all,
Trying to have a s3 parquet bulk writer with file roll policy based on size limitation + checkpoint. For that I’ve extended the CheckpointRollingPolicy and overwritten shouldRollOnEvent to return true if the part size is greater than the limit.
The problem is that the part size that I get from PartFileInfo.getSize() is always 4. It never changes.
Is that a misconfiguration somewhere or that’s not supported for s3 parquet files?
@Slf4j
public class FileSizeAndOnCheckpointRollingPolicy extends CheckpointRollingPolicy<CloudEventAvro, String> {
private final long rollingFileSize;
public FileSizeAndOnCheckpointRollingPolicy (long rollingFileSize) {
this.rollingFileSize = rollingFileSize;
}
@Override
public boolean shouldRollOnEvent (PartFileInfo<String> partFileState, CloudEventAvro element)
throws IOException {
log.info ("Part size: {}, rolling file size: {}", partFileState.getSize (), rollingFileSize);
return partFileState.getSize () > rollingFileSize;
}
@Override
public boolean shouldRollOnProcessingTime (PartFileInfo<String> partFileState, long currentTime) {
return false;
}
}