You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Madhukar Thota <ma...@gmail.com> on 2017/08/10 12:52:20 UTC

Aggregation based on Timestamp

Hi

We have use case where we have thousands of Telegraf agents sending data to
kafka( some of them are sending 10s interval, 15s interval and 30s
interval). We would like to aggregate the incoming data to 1 minuter
interval based on the hostname as key before we write into influxdb. Is it
possible to do this type of usecase with Flink? if so any sample to get
started?

sample data ( influxdb line protocal) coming from Kafka

weather,location=us-midwest,season=summer temperature=82 1465839830100400200


-Madhu

Re: Aggregation based on Timestamp

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

Yes, this is definitely doable in Flink, and should be very straightforward.

Basically, what you would do is define a FlinkKafkaConsumer source for your Kafka topic [1], following that a keyBy operation on the hostname [2], and then a 1-minute time window aggregation [3]. At the end of your pipeline would be a InfluxDB sink. There isn’t one out of the box, but it should be fairly easy to implement.
If you want deterministic results based on event-time processing, that is also possible [4].

Just throwing you links to get started here :) Let us know if you have more problems getting started.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#datastream-transformations
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html
[4] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html

On 10 August 2017 at 8:52:25 PM, Madhukar Thota (madhukar.thota@gmail.com) wrote:

Hi

We have use case where we have thousands of Telegraf agents sending data to kafka( some of them are sending 10s interval, 15s interval and 30s interval). We would like to aggregate the incoming data to 1 minuter interval based on the hostname as key before we write into influxdb. Is it possible to do this type of usecase with Flink? if so any sample to get started?

sample data ( influxdb line protocal) coming from Kafka

weather,location=us-midwest,season=summer temperature=82 1465839830100400200

-Madhu