You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2019/10/17 16:47:36 UTC

currentWatermark for Event Time is not increasing fast enough to go past the window.maxTimestamp

Hi,
*Event Time Window: 15s*
My currentWatermark for Event Time processing is not increasing fast enough
to go past the window maxTimestamp.
I have reduced *bound* used for watermark calculation to just *10 ms*.
I have increased the parallelInput to process input from Kinesis in
parallel to 2 slots on my laptop.//env.addSource(kinesisConsumer)
.setParallelism(2);
For FlinkKinesisConsumer, I added a property from flink-1.8.0,
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.
*SHARD_IDLE_INTERVAL_MILLIS*, 25);//this didn't seem to help

//in *EventTimeTrigger*.java: if (window.maxTimestamp() <=
ctx.getCurrentWatermark()) Trigger.FIRE;
My event producer to Kinesis is producing at a delay of 2500 ms for each
record.(business requirement).
What else can I do to consume data from Kinesis faster and cross the
threshold for
currentWatermark to increase beyond the window.maxTimestamp faster ?

*MonitoringTSWAssigner* code:
public class MonitoringTSWAssigner implements
AssignerWithPunctuatedWatermarks<Map<String, Object>> {
    private long bound = 5 * (long) 1000;//5 secs out of order bound in
millisecs
    private long maxTimestamp = Long.MIN_VALUE;

    public MonitoringTSWAssigner() {
    }

    public MonitoringTSWAssigner(long bound) {
        this.bound = bound;
    }

    public long extractTimestamp(Map<String, Object> monitoring, long
previousTS) {
        long extractedTS = getExtractedTS(monitoring);
        if (extractedTS > maxTimestamp) {
            maxTimestamp = extractedTS;
        }
   return extractedTS;
//return System.currentTimeMillis();
    }

    public long getExtractedTS(Map<String, Object> monitoring) {
        final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP)
!= null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
        return Utils.getLongFromDateStr(eventTimestamp);
    }

    @Override
    public Watermark checkAndGetNextWatermark(Map<String, Object>
monitoring, long extractedTimestamp) {
        long extractedTS = getExtractedTS(monitoring);
        long nextWatermark = extractedTimestamp - *bound*;
        return new Watermark(nextWatermark);
    }
}

TIA