You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2020/07/30 18:29:43 UTC

Count of records in the Stream for a time window of 5s

Hi,

Trying to get a count of records in the Stream for a time window of 5s.
Always getting a count of 1 ??
Sent in 10 records.Expect the count to be 10 at the end.

Tried to follow the advise here from Fabian Hueske-
https://stackoverflow.com/questions/45606999/how-to-count-the-number-of-records-processed-by-apache-flink-in-a-given-time-win

DataStream<Map<String, Object>> kinesisStream;
...//get data from Kinesis source into kinesisStream - works fine
final SingleOutputStreamOperator<Map<String, Object>> filterDroppedEvents =
kinesisStream
    .filter(resultMap -> {
        long timestamp = Utils.getEventTimestampFromMap(resultMap);
        long currTimestamp = System.currentTimeMillis();
        long driftFromCurrTS = currTimestamp - timestamp;
        if (driftFromCurrTS < 0) {
            Object eventNameObj = resultMap.get(EVENT_NAME);
            String eventName = eventNameObj != null ? (String) eventNameObj
: "";
            logger.debug("PMS - event_timestamp is > current timestamp by
driftFromCurrTS:{} for event_name:{} and event_timestamp:{}",
driftFromCurrTS, eventName, timestamp);
            return true;
        } else {
            return false;
        }
    });//*called 10 times here - GOOD*

final SingleOutputStreamOperator<CountRows> droppedEventsMapToCountRows =
filterDroppedEvents
        .map(mapValue -> new CountRows(mapValue, 1L,
mapValue.get(EVENT_NAME) != null ? (String) mapValue.get(EVENT_NAME) :
""));//this is* called 10 times - GOOD*

final KeyedStream<CountRows, String> countRowsKeyedStream =
droppedEventsMapToCountRows.keyBy(new KeySelector<CountRows, String>() {
@Override
public String getKey(CountRows countRows) throws Exception {
logger.info("Inside getKey");
return countRows.getEventName();
}
});//*doesn't get in here to this logger statement ??*

final AllWindowedStream<CountRows, TimeWindow>
countRowsTimeWindowAllWindowedStream =  countRowsKeyedStream
        .*timeWindowAll*
(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));
//.sum("count")
final SingleOutputStreamOperator<CountRows> countRowsReduceStream =
countRowsTimeWindowAllWindowedStream.reduce((accum, input) -> {
            logger.info("Inside reduce");
            return new CountRows(input.getRow(), accum.getCount() +
input.getCount(), input.getEventName());// sum 1s to count
        });//*don't see this logger statement "Inside reduce"*

DataStream<InfluxDBPoint> droppedEventsStream =
        countRowsReduceStream.flatMap(new FlatMapFunction<CountRows,
InfluxDBPoint>() {
    @Override
    public void flatMap(CountRows countRows, Collector<InfluxDBPoint> out)
throws Exception {
        logger.info("Inside final map"); // *only called once and
countRows.getCount() is 1 - BAD - want it to be 10 ??*
        Map<String, Object> mapValue = countRows.getRow();
        //long currTimestamp = System.currentTimeMillis();
        Object eventTSObj = mapValue.get(EVENT_TIMESTAMP);
        String eventTimestamp = eventTSObj != null ? (String)eventTSObj :
"";
        long eventTS = Utils.getLongFromDateStr(eventTimestamp);
        Map<String, String> tags = new HashMap<>();
        Object eventNameObj = mapValue.get(Utils.EVENT_NAME);
        String eventName = eventNameObj != null ? (String)eventNameObj : "";
        tags.put(Utils.EVENT_NAME, eventName);
        Map<String, Object> fields = new HashMap<>();
        fields.put("count", *countRows.getCount()*);
        out.collect(new InfluxDBPoint("dropped_events_count", eventTS,
tags, fields));//TODO: measurement name
    }
});
        /* *Tried map but doesn't work*
reduceStream.map(countRows -> {
            logger.info("Inside final map");
            Map<String, Object> mapValue = countRows.getRow();
            //long currTimestamp = System.currentTimeMillis();
            Object eventTSObj = mapValue.get(EVENT_TIMESTAMP);
            String eventTimestamp = eventTSObj != null ? (String)eventTSObj
: "";
            long eventTS = Utils.getLongFromDateStr(eventTimestamp);
            Map<String, String> tags = new HashMap<>();
            Object eventNameObj = mapValue.get(Utils.EVENT_NAME);
            String eventName = eventNameObj != null ? (String)eventNameObj
: "";
            tags.put(Utils.EVENT_NAME, eventName);
            Map<String, Object> fields = new HashMap<>();
            fields.put("count", countRows.getCount());
            return new InfluxDBPoint("dropped_events_count", eventTS, tags,
fields);//TODO: measurement name
        });*/
droppedEventsStream.addSink(influxSink);

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
CountRows is a *POJO wrapper around the Map<String, Object> to add the
count*:
public static class *CountRows* implements Serializable,
Comparable<CountRows> {
        Map<String, Object> row;
        Long *count*;
        String eventName;
.........



TIA,