You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Vararu, Vadim" <va...@adswizz.com> on 2023/05/10 08:20:50 UTC

S3 Parquet files rolling on event not working because PartFileInfo.getSize() does not increase.

Hi all,

Trying to have a s3 parquet bulk writer with file roll policy based on size limitation + checkpoint. For that I’ve extended the CheckpointRollingPolicy and overwritten shouldRollOnEvent to return true if the part size is greater than the limit.

The problem is that the part size that I get from PartFileInfo.getSize() is always 4. It never changes.

Is that a misconfiguration somewhere or that’s not supported for s3 parquet files?


@Slf4j
public class FileSizeAndOnCheckpointRollingPolicy extends CheckpointRollingPolicy<CloudEventAvro, String> {

    private final long rollingFileSize;

    public FileSizeAndOnCheckpointRollingPolicy (long rollingFileSize) {
        this.rollingFileSize = rollingFileSize;
    }

    @Override
    public boolean shouldRollOnEvent (PartFileInfo<String> partFileState, CloudEventAvro element)
        throws IOException {
        log.info ("Part size: {}, rolling file size: {}", partFileState.getSize (), rollingFileSize);
        return partFileState.getSize () > rollingFileSize;
    }

    @Override
    public boolean shouldRollOnProcessingTime (PartFileInfo<String> partFileState, long currentTime) {
        return false;
    }
}