You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Furkan KAMACI <fu...@gmail.com> on 2016/10/18 12:00:10 UTC

Kafka Streams Aggregate By Date

Hi,

I could successfully run Kafka at my environment. I want to monitor Queries
per Second at my search application with Kafka. Whenever a search request
is done I create a ProducerRecord which holds current nano time of the
system.

I know that I have to use a streaming API for calculation i.e. Kafka
Streams or Spark Streams. My choice is to use Kafka Streams.

For last 1 hours, or since the beginning, I have to calculate the queries
per second. How can I make such an aggregation at Kafka Streams?

Kind Regards,
Furkan KAMACI

Re: Kafka Streams Aggregate By Date

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

About auto-topic creation: If your broker configuration allows for
this, yes it would work. However, keep in mind, that the topic will be
created with default values (according to broker config) with regard
to number of partitions and replication factor. Those value might not
meet your needs. Therefore, it is highly recommended to not rely on
topic auto-create but to create all topics manually (to specify number
of partitions and replication factor with values that do meet your needs
).

And great that you got window-count working.

- From you email, I am not sure if you are stuck again and if yes, what
your question is?

- -Matthias

On 10/19/16 11:03 AM, Furkan KAMACI wrote:
> I could successfully get the total (not average). As far as I see,
> there is no need to create a topic manually before I run the app.
> Topic is created if there is data and topic name not exists. Here
> is my code:
> 
> KStreamBuilder builder = new KStreamBuilder();
> 
> KStream<String, String> longs = builder.stream(Serdes.String(), 
> Serdes.String(), "mytopic");
> 
> KTable<Windowed<String>, Long> longCounts = 
> longs.countByKey(TimeWindows.of("output-topic", 3600 * 1000), 
> Serdes.String());
> 
> KafkaStreams streams = new KafkaStreams(builder, 
> streamsConfiguration); streams.start();
> 
> Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
> 
> On Wed, Oct 19, 2016 at 1:58 AM, Matthias J. Sax
> <ma...@confluent.io> wrote:
> 
> You should create input/intermediate and output topic manually
> before you start you Kafka Streams application.
> 
> 
> -Matthias
> 
> On 10/18/16 3:34 PM, Furkan KAMACI wrote:
>>>> Sorry about concurrent questions. Tried below code, didn't
>>>> get any error but couldn't get created output topic:
>>>> 
>>>> Properties props = new Properties();
>>>> props.put("bootstrap.servers", "localhost:9092");
>>>> props.put("acks", "all"); props.put("retries", 0);
>>>> props.put("batch.size", 16384); props.put("linger.ms", 1); 
>>>> props.put("buffer.memory", 33554432);
>>>> props.put("key.serializer", 
>>>> "org.apache.kafka.common.serialization.StringSerializer"); 
>>>> props.put("value.serializer", 
>>>> "org.apache.kafka.common.serialization.StringSerializer");
>>>> 
>>>> Producer<String, String> producer = new
>>>> KafkaProducer<>(props);
>>>> 
>>>> for (int i = 0; i < 1000; i++) { producer.send(new 
>>>> ProducerRecord<>( "input-topic",
>>>> String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}",
>>>> System.nanoTime() * 1e-9, i)));
>>>> 
>>>> 
>>>> final KStreamBuilder builder = new KStreamBuilder(); final 
>>>> KStream<String, Long> qps = builder.stream(Serdes.String(), 
>>>> Serdes.Long(), "input-topic"); 
>>>> qps.countByKey(TimeWindows.of("Hourly", 3600 * 
>>>> 1000)).mapValues(Object::toString).to("output-topic");
>>>> 
>>>> final KafkaStreams streams = new KafkaStreams(builder, 
>>>> streamsConfiguration); streams.start();
>>>> 
>>>> Runtime.getRuntime().addShutdownHook(new
>>>> Thread(streams::close));
>>>> 
>>>> On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax 
>>>> <ma...@confluent.io> wrote:
>>>> 
>>>> Two things:
>>>> 
>>>> 1) you should not apply the window to the first count, but to
>>>> the base stream to get correct results.
>>>> 
>>>> 2) your windowed aggregation, doew not just return String
>>>> type, but Window<K> type. Thus, you need to either insert a
>>>> .map() to transform you data into String typo, or you provide
>>>> a custom serializer when writing data to output topic
>>>> (method, .to(...) has multiple overloads)
>>>> 
>>>> Per default, each topic read/write operation uses Serdes from
>>>> the streams config. If you data has a different type, you
>>>> need to provide appropriate Serdes for those operators.
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 10/18/16 2:01 PM, Furkan KAMACI wrote:
>>>>>>> Hi Matthias,
>>>>>>> 
>>>>>>> I've tried this code:
>>>>>>> 
>>>>>>> *        final Properties streamsConfiguration = new 
>>>>>>> Properties();* * 
>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>>>>
>>>>>>>
>
>>>>>>> 
"myapp");* *
>>>>>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>>>
>>>>>>>
>
>>>>>>> 
"localhost:9092");* *
>>>>>>> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>>>>>>>
>>>>>>>
>
>>>>>>> 
"localhost:2181");* *
>>>>>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>>>>
>>>>>>>
>
>>>>>>> 
Serdes.String().getClass().getName());* *
>>>>>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>>>>
>>>>>>>
>
>>>>>>> 
Serdes.String().getClass().getName());* *        final
>>>>>>> KStreamBuilder builder = new KStreamBuilder();* * final
>>>>>>> KStream input = builder.stream("myapp-test");*
>>>>>>> 
>>>>>>> *        final KStream<String, Long> searchCounts = 
>>>>>>> input.countByKey("SearchRequests").toStream();* * 
>>>>>>> searchCounts.countByKey(TimeWindows.of("Hourly", 3600
>>>>>>> * 1000)).to("outputTopicHourlyCounts");*
>>>>>>> 
>>>>>>> *        final KafkaStreams streams = new 
>>>>>>> KafkaStreams(builder, streamsConfiguration);* * 
>>>>>>> streams.start();*
>>>>>>> 
>>>>>>> *        Runtime.getRuntime().addShutdownHook(new 
>>>>>>> Thread(streams::close));*
>>>>>>> 
>>>>>>> However I get an error:
>>>>>>> 
>>>>>>> 
>>>>>>> *Exception in thread "StreamThread-1" 
>>>>>>> java.lang.ClassCastException: 
>>>>>>> org.apache.kafka.streams.kstream.Windowed cannot be
>>>>>>> cast to java.lang.String*
>>>>>>> 
>>>>>>> On the other hand when I try this code:
>>>>>>> 
>>>>>>> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c77
60a
>
>>>>>>> 
f3a
>>>>>>> 
>>>>>>> 
>>>>>>> 
> I get an error too which indicates that:
>>>>>>> 
>>>>>>> *Exception in thread "StreamThread-1" 
>>>>>>> org.apache.kafka.common.errors.SerializationException:
>>>>>>> Size of data received by LongDeserializer is not 8 *
>>>>>>> 
>>>>>>> Here is generated topic:
>>>>>>> 
>>>>>>> *kafka-console-consumer --zookeeper localhost:2181
>>>>>>> --topic myapp-test --from-beginning* *
>>>>>>> 28952314828122* * 28988681653726* *
>>>>>>> 29080089383233*
>>>>>>> 
>>>>>>> I know that I miss something but couldn't find it.
>>>>>>> 
>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>> 
>>>>>>> On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax 
>>>>>>> <ma...@confluent.io> wrote:
>>>>>>> 
>>>>>>> I see. KGroupedStream will be part of 0.10.1.0 (should
>>>>>>> be release the next weeks).
>>>>>>> 
>>>>>>> So, instead of
>>>>>>> 
>>>>>>>>>> .groupByKey().count()
>>>>>>> 
>>>>>>> you need to do
>>>>>>> 
>>>>>>>>>> .countByKey()
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -Matthias
>>>>>>> 
>>>>>>> On 10/18/16 12:05 PM, Furkan KAMACI wrote:
>>>>>>>>>> Hi Matthias,
>>>>>>>>>> 
>>>>>>>>>> Thanks for your detailed answer. By the way I
>>>>>>>>>> couldn't find "KGroupedStream" at version of
>>>>>>>>>> 0.10.0.1?
>>>>>>>>>> 
>>>>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>>>>> 
>>>>>>>>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax 
>>>>>>>>>> <ma...@confluent.io> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> You just need to read you stream and apply an 
>>>>>>>>>> (windowed) aggregation on it.
>>>>>>>>>> 
>>>>>>>>>> If you use non-windowed aggregation you will get
>>>>>>>>>> "since the beginning". If you use windowed
>>>>>>>>>> aggregation you can specify the window size as 1
>>>>>>>>>> hour and get those results.
>>>>>>>>>> 
>>>>>>>>>> One comment: it seems that you want to count
>>>>>>>>>> *all* queries. To make this work, you need to
>>>>>>>>>> make sure all records are using the same key
>>>>>>>>>> (because Kafka Streams only supports aggregation
>>>>>>>>>> over keyed streams). Keep in mind, that this
>>>>>>>>>> prohibits parallelization of you aggregation!
>>>>>>>>>> 
>>>>>>>>>> As a workaround, you could also do two
>>>>>>>>>> consecutive aggregation, and do parallelize the
>>>>>>>>>> first one, and do not parallelize the second one
>>>>>>>>>> (ie, using the first one as a pre aggregation
>>>>>>>>>> similar to a combine step)
>>>>>>>>>> 
>>>>>>>>>> Without pre aggregation and assuming all records
>>>>>>>>>> use the same key something like this (for current
>>>>>>>>>> trunk):
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>>>> KStreamBuilder builder = new
>>>>>>>>>>>>> KStreamBuilder(): KStream input =
>>>>>>>>>>>>> builder.stream("yourTopic");
>>>>>>>>>>>>> 
>>>>>>>>>>>>> KGroupedStream groupedInput = 
>>>>>>>>>>>>> input.groupByKey();
>>>>>>>>>>>>> 
>>>>>>>>>>>>> groupedInput.count("countStore").to("outputTopicCountFromB
egi
>
>>>>>>>>>>>>> 
nni
>>>> 
>>>>>>>>>>>>> 
> ng"
>>>>>>> 
>>>>>>>>>>>>> 
>>>> );
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>> groupedInput.count(TimeWindows.of(3600 * 1000),
>>>>>>>>>> "windowedCountStore").to("outputTopicHourlyCounts"):
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 
For more details, please see the docs and examples:
>>>>>>>>>> 
>>>>>>>>>> -
>>>>>>>>>> http://docs.confluent.io/current/streams/index.html
>>>>>>>>>>
>>>>>>>>>> 
- -
>>>>>>>>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-
cp-
>
>>>>>>>>>> 
3.0
>>>> 
>>>>>>>>>> 
> .1/
>>>>>>> 
>>>>>>>>>> 
>>>> ka
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> fka-streams
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -Matthias
>>>>>>>>>> 
>>>>>>>>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I could successfully run Kafka at my
>>>>>>>>>>>>> environment. I want to monitor Queries per
>>>>>>>>>>>>> Second at my search application with Kafka.
>>>>>>>>>>>>> Whenever a search request is done I create
>>>>>>>>>>>>> a ProducerRecord which holds current nano
>>>>>>>>>>>>> time of the system.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I know that I have to use a streaming API
>>>>>>>>>>>>> for calculation i.e. Kafka Streams or Spark
>>>>>>>>>>>>> Streams. My choice is to use Kafka
>>>>>>>>>>>>> Streams.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> For last 1 hours, or since the beginning, I
>>>>>>>>>>>>> have to calculate the queries per second.
>>>>>>>>>>>>> How can I make such an aggregation at Kafka
>>>>>>>>>>>>> Streams?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYB7zqAAoJECnhiMLycopP24sP/0QsNvVnqOYCpd1m3GI96JrT
Wbn7q5YAUL95JolBd0GmTb1DeCy+c1n5+DS8I8yx2Cr4yr1WoymSKnmoP14H8pMG
JTb4Dc/QZZEUS4pE5JD6/524IcEC7CA/L3qu/Ax2m5BG6hsYheGhP02vMnrThYZh
DqNxjp+7WAB3OZ73X9gOQyuF3qyn+QEyC2ebNRlH5oRowjaEqIixsfzv3OmMCrpL
U+0EkQE4PZzogQ1arp8CxHNozLjVQeca/X4EM9lqfpf2+qEo7MBtjdRSBvRUZMgd
OHssAX3Nldk4y6u+a3EWPcsm1QHfi2qg1MFD42VW02pY8ncCQeQ8OYgLhqvuMjkh
P/7JoSbbRBM62zv92mHruTuUJGOYe6OMwEb4G3Jmi6Yd++2kcZD8kGOLwUqUY+X7
MElpVW7QRrSGwUH9Qc+OffZ8udHNQUNRlZfPJPuMaBL/WZ+krmkd/neq8nsK7UR4
n0l+jNzbn+LNTjYG5Lvp3IzskIaW9D2r/2BaQedKi8BwhvXHIcH9aO1WrH6Nlt0r
omoTK4e/5u4HAGPyy08YKqFd6m5ofDyMf0GJ0oigQ/ifRdZWITNX5W0GSZZ2PI4d
MaRxwz29rr41pM0RP6XFP5nqb+vI23v1kW9b8OnjkEXfMXUaJgyAEQHDrrICS/UM
X0rcIVer8vIjol9JwQyS
=awCG
-----END PGP SIGNATURE-----

Re: Kafka Streams Aggregate By Date

Posted by Furkan KAMACI <fu...@gmail.com>.
I could successfully get the total (not average). As far as I see, there is
no need to create a topic manually before I run the app. Topic is created
if there is data and topic name not exists. Here is my code:

KStreamBuilder builder = new KStreamBuilder();

        KStream<String, String> longs = builder.stream(Serdes.String(),
Serdes.String(), "mytopic");

        KTable<Windowed<String>, Long> longCounts =
                longs.countByKey(TimeWindows.of("output-topic", 3600 *
1000),
                        Serdes.String());

        KafkaStreams streams = new KafkaStreams(builder,
streamsConfiguration);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

On Wed, Oct 19, 2016 at 1:58 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> You should create input/intermediate and output topic manually before
> you start you Kafka Streams application.
>
>
> - -Matthias
>
> On 10/18/16 3:34 PM, Furkan KAMACI wrote:
> > Sorry about concurrent questions. Tried below code, didn't get any
> > error but couldn't get created output topic:
> >
> > Properties props = new Properties(); props.put("bootstrap.servers",
> > "localhost:9092"); props.put("acks", "all"); props.put("retries",
> > 0); props.put("batch.size", 16384); props.put("linger.ms", 1);
> > props.put("buffer.memory", 33554432); props.put("key.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> > props.put("value.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> >
> > Producer<String, String> producer = new KafkaProducer<>(props);
> >
> > for (int i = 0; i < 1000; i++) { producer.send(new
> > ProducerRecord<>( "input-topic", String.format("{\"type\":\"test\",
> > \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
> >
> >
> > final KStreamBuilder builder = new KStreamBuilder(); final
> > KStream<String, Long> qps = builder.stream(Serdes.String(),
> > Serdes.Long(), "input-topic");
> > qps.countByKey(TimeWindows.of("Hourly", 3600 *
> > 1000)).mapValues(Object::toString).to("output-topic");
> >
> > final KafkaStreams streams = new KafkaStreams(builder,
> > streamsConfiguration); streams.start();
> >
> > Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
> >
> > On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax
> > <ma...@confluent.io> wrote:
> >
> > Two things:
> >
> > 1) you should not apply the window to the first count, but to the
> > base stream to get correct results.
> >
> > 2) your windowed aggregation, doew not just return String type,
> > but Window<K> type. Thus, you need to either insert a .map() to
> > transform you data into String typo, or you provide a custom
> > serializer when writing data to output topic (method, .to(...) has
> > multiple overloads)
> >
> > Per default, each topic read/write operation uses Serdes from the
> > streams config. If you data has a different type, you need to
> > provide appropriate Serdes for those operators.
> >
> >
> > -Matthias
> >
> > On 10/18/16 2:01 PM, Furkan KAMACI wrote:
> >>>> Hi Matthias,
> >>>>
> >>>> I've tried this code:
> >>>>
> >>>> *        final Properties streamsConfiguration = new
> >>>> Properties();* *
> >>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>>>
> >>>>
> "myapp");* *
> >>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>
> >>>>
> "localhost:9092");* *
> >>>> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> >>>>
> >>>>
> "localhost:2181");* *
> >>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>
> >>>>
> Serdes.String().getClass().getName());* *
> >>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>
> >>>>
> Serdes.String().getClass().getName());* *        final
> >>>> KStreamBuilder builder = new KStreamBuilder();* *
> >>>> final KStream input = builder.stream("myapp-test");*
> >>>>
> >>>> *        final KStream<String, Long> searchCounts =
> >>>> input.countByKey("SearchRequests").toStream();* *
> >>>> searchCounts.countByKey(TimeWindows.of("Hourly", 3600 *
> >>>> 1000)).to("outputTopicHourlyCounts");*
> >>>>
> >>>> *        final KafkaStreams streams = new
> >>>> KafkaStreams(builder, streamsConfiguration);* *
> >>>> streams.start();*
> >>>>
> >>>> *        Runtime.getRuntime().addShutdownHook(new
> >>>> Thread(streams::close));*
> >>>>
> >>>> However I get an error:
> >>>>
> >>>>
> >>>> *Exception in thread "StreamThread-1"
> >>>> java.lang.ClassCastException:
> >>>> org.apache.kafka.streams.kstream.Windowed cannot be cast to
> >>>> java.lang.String*
> >>>>
> >>>> On the other hand when I try this code:
> >>>>
> >>>> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760a
> f3a
> >>>>
> >>>>
> >>>>
> I get an error too which indicates that:
> >>>>
> >>>> *Exception in thread "StreamThread-1"
> >>>> org.apache.kafka.common.errors.SerializationException: Size
> >>>> of data received by LongDeserializer is not 8 *
> >>>>
> >>>> Here is generated topic:
> >>>>
> >>>> *kafka-console-consumer --zookeeper localhost:2181 --topic
> >>>> myapp-test --from-beginning* *        28952314828122* *
> >>>> 28988681653726* *        29080089383233*
> >>>>
> >>>> I know that I miss something but couldn't find it.
> >>>>
> >>>> Kind Regards, Furkan KAMACI
> >>>>
> >>>> On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax
> >>>> <ma...@confluent.io> wrote:
> >>>>
> >>>> I see. KGroupedStream will be part of 0.10.1.0 (should be
> >>>> release the next weeks).
> >>>>
> >>>> So, instead of
> >>>>
> >>>>>>> .groupByKey().count()
> >>>>
> >>>> you need to do
> >>>>
> >>>>>>> .countByKey()
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 10/18/16 12:05 PM, Furkan KAMACI wrote:
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>> Thanks for your detailed answer. By the way I couldn't
> >>>>>>> find "KGroupedStream" at version of 0.10.0.1?
> >>>>>>>
> >>>>>>> Kind Regards, Furkan KAMACI
> >>>>>>>
> >>>>>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax
> >>>>>>> <ma...@confluent.io> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> You just need to read you stream and apply an
> >>>>>>> (windowed) aggregation on it.
> >>>>>>>
> >>>>>>> If you use non-windowed aggregation you will get "since
> >>>>>>> the beginning". If you use windowed aggregation you can
> >>>>>>> specify the window size as 1 hour and get those
> >>>>>>> results.
> >>>>>>>
> >>>>>>> One comment: it seems that you want to count *all*
> >>>>>>> queries. To make this work, you need to make sure all
> >>>>>>> records are using the same key (because Kafka Streams
> >>>>>>> only supports aggregation over keyed streams). Keep in
> >>>>>>> mind, that this prohibits parallelization of you
> >>>>>>> aggregation!
> >>>>>>>
> >>>>>>> As a workaround, you could also do two consecutive
> >>>>>>> aggregation, and do parallelize the first one, and do
> >>>>>>> not parallelize the second one (ie, using the first one
> >>>>>>> as a pre aggregation similar to a combine step)
> >>>>>>>
> >>>>>>> Without pre aggregation and assuming all records use
> >>>>>>> the same key something like this (for current trunk):
> >>>>>>>
> >>>>>>>
> >>>>>>>>>> KStreamBuilder builder = new KStreamBuilder():
> >>>>>>>>>> KStream input = builder.stream("yourTopic");
> >>>>>>>>>>
> >>>>>>>>>> KGroupedStream groupedInput =
> >>>>>>>>>> input.groupByKey();
> >>>>>>>>>>
> >>>>>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBegi
> nni
> >
> >>>>>>>>>>
> ng"
> >>>>
> >>>>>>>>>>
> > );
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>>>>>
> >>>> groupedInput.count(TimeWindows.of(3600 * 1000),
> >>>>>>> "windowedCountStore").to("outputTopicHourlyCounts"):
> >>>>>>>
> >>>>>>>
> >>>>>>> For more details, please see the docs and examples:
> >>>>>>>
> >>>>>>> - http://docs.confluent.io/current/streams/index.html
> >>>>>>> -
> >>>>>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-
> 3.0
> >
> >>>>>>>
> .1/
> >>>>
> >>>>>>>
> > ka
> >>>>>>>
> >>>>>>>
> >>>> fka-streams
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> I could successfully run Kafka at my environment.
> >>>>>>>>>> I want to monitor Queries per Second at my
> >>>>>>>>>> search application with Kafka. Whenever a search
> >>>>>>>>>> request is done I create a ProducerRecord which
> >>>>>>>>>> holds current nano time of the system.
> >>>>>>>>>>
> >>>>>>>>>> I know that I have to use a streaming API for
> >>>>>>>>>> calculation i.e. Kafka Streams or Spark Streams.
> >>>>>>>>>> My choice is to use Kafka Streams.
> >>>>>>>>>>
> >>>>>>>>>> For last 1 hours, or since the beginning, I have
> >>>>>>>>>> to calculate the queries per second. How can I
> >>>>>>>>>> make such an aggregation at Kafka Streams?
> >>>>>>>>>>
> >>>>>>>>>> Kind Regards, Furkan KAMACI
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYBqkRAAoJECnhiMLycopPLTUP/0fZeo1QP4xT0egmMfqvMw66
> //NkZZSFacv+6EdXRmsewvJOTzYhePsDoBQX8SSAFuSc+PiMTz/A3opisorZj+zU
> u5OngfvmIWpxMd/V960AZZIynPsvpqTR+7NVjGfvCJ3LNKVJNEMp3wyM5bGmGMsf
> Nh69P4g6WNo6lG6weZDLYME+E8+gDQ0HgdoheK+/5ZBhxZFI6no/ab1hg9uIxjmG
> EI1gMrnL2WP3ttTZVpOqiJnxZoKsu2wPLhpea86BRGtIaOdDQPXx/Mkt4+f36EjK
> 9h3+er2RUsKmXjFPgZe9zefigVPcYVIEjtLy2HjWF3Qwb3XhAWV9cY9izscPmsJy
> spb7Gub8gQFI3SJJlYeS9Rlb7HaxQ2jUjsg0r3WUaQwcIZBwpk/1FI77Pj/Y60WH
> ebhGPg6WJrg+5msw6N1wR4cxRaql2VPxHehlGT+D/7zyNuT7TiwZA6VU66Iao98o
> rBodwnQAQ5DDZyFpGAdm1PucqzT5Psq10Nkxn4BMBvpg7+CzjkMpn2lcRfU+j3HL
> 4QBBDA2iH+glEzzjN3ag17SRJd3wwPbpxkbHQsQtCyDR9Ki4jpEyIO6xrLRb+9hg
> G6u37217Zxgbh0jfUzWhNAOtu8wCtx6EVZ7QjJ5RUsgQhoQSFTgYBVVVU7G0kQT2
> FF4TM+KTfpWwXKPTEwyV
> =+X93
> -----END PGP SIGNATURE-----
>

Re: Kafka Streams Aggregate By Date

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

You should create input/intermediate and output topic manually before
you start you Kafka Streams application.


- -Matthias

On 10/18/16 3:34 PM, Furkan KAMACI wrote:
> Sorry about concurrent questions. Tried below code, didn't get any
> error but couldn't get created output topic:
> 
> Properties props = new Properties(); props.put("bootstrap.servers",
> "localhost:9092"); props.put("acks", "all"); props.put("retries",
> 0); props.put("batch.size", 16384); props.put("linger.ms", 1); 
> props.put("buffer.memory", 33554432); props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer"); 
> props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> 
> Producer<String, String> producer = new KafkaProducer<>(props);
> 
> for (int i = 0; i < 1000; i++) { producer.send(new
> ProducerRecord<>( "input-topic", String.format("{\"type\":\"test\",
> \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
> 
> 
> final KStreamBuilder builder = new KStreamBuilder(); final
> KStream<String, Long> qps = builder.stream(Serdes.String(), 
> Serdes.Long(), "input-topic"); 
> qps.countByKey(TimeWindows.of("Hourly", 3600 * 
> 1000)).mapValues(Object::toString).to("output-topic");
> 
> final KafkaStreams streams = new KafkaStreams(builder, 
> streamsConfiguration); streams.start();
> 
> Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
> 
> On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax
> <ma...@confluent.io> wrote:
> 
> Two things:
> 
> 1) you should not apply the window to the first count, but to the
> base stream to get correct results.
> 
> 2) your windowed aggregation, doew not just return String type,
> but Window<K> type. Thus, you need to either insert a .map() to
> transform you data into String typo, or you provide a custom
> serializer when writing data to output topic (method, .to(...) has
> multiple overloads)
> 
> Per default, each topic read/write operation uses Serdes from the 
> streams config. If you data has a different type, you need to
> provide appropriate Serdes for those operators.
> 
> 
> -Matthias
> 
> On 10/18/16 2:01 PM, Furkan KAMACI wrote:
>>>> Hi Matthias,
>>>> 
>>>> I've tried this code:
>>>> 
>>>> *        final Properties streamsConfiguration = new 
>>>> Properties();* * 
>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>
>>>> 
"myapp");* *
>>>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>
>>>> 
"localhost:9092");* *
>>>> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>>>>
>>>> 
"localhost:2181");* *
>>>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>
>>>> 
Serdes.String().getClass().getName());* *
>>>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>
>>>> 
Serdes.String().getClass().getName());* *        final
>>>> KStreamBuilder builder = new KStreamBuilder();* *
>>>> final KStream input = builder.stream("myapp-test");*
>>>> 
>>>> *        final KStream<String, Long> searchCounts = 
>>>> input.countByKey("SearchRequests").toStream();* * 
>>>> searchCounts.countByKey(TimeWindows.of("Hourly", 3600 * 
>>>> 1000)).to("outputTopicHourlyCounts");*
>>>> 
>>>> *        final KafkaStreams streams = new
>>>> KafkaStreams(builder, streamsConfiguration);* *
>>>> streams.start();*
>>>> 
>>>> *        Runtime.getRuntime().addShutdownHook(new 
>>>> Thread(streams::close));*
>>>> 
>>>> However I get an error:
>>>> 
>>>> 
>>>> *Exception in thread "StreamThread-1" 
>>>> java.lang.ClassCastException: 
>>>> org.apache.kafka.streams.kstream.Windowed cannot be cast to 
>>>> java.lang.String*
>>>> 
>>>> On the other hand when I try this code:
>>>> 
>>>> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760a
f3a
>>>>
>>>>
>>>> 
I get an error too which indicates that:
>>>> 
>>>> *Exception in thread "StreamThread-1" 
>>>> org.apache.kafka.common.errors.SerializationException: Size
>>>> of data received by LongDeserializer is not 8 *
>>>> 
>>>> Here is generated topic:
>>>> 
>>>> *kafka-console-consumer --zookeeper localhost:2181 --topic 
>>>> myapp-test --from-beginning* *        28952314828122* * 
>>>> 28988681653726* *        29080089383233*
>>>> 
>>>> I know that I miss something but couldn't find it.
>>>> 
>>>> Kind Regards, Furkan KAMACI
>>>> 
>>>> On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax 
>>>> <ma...@confluent.io> wrote:
>>>> 
>>>> I see. KGroupedStream will be part of 0.10.1.0 (should be
>>>> release the next weeks).
>>>> 
>>>> So, instead of
>>>> 
>>>>>>> .groupByKey().count()
>>>> 
>>>> you need to do
>>>> 
>>>>>>> .countByKey()
>>>> 
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 10/18/16 12:05 PM, Furkan KAMACI wrote:
>>>>>>> Hi Matthias,
>>>>>>> 
>>>>>>> Thanks for your detailed answer. By the way I couldn't
>>>>>>> find "KGroupedStream" at version of 0.10.0.1?
>>>>>>> 
>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>> 
>>>>>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax 
>>>>>>> <ma...@confluent.io> wrote:
>>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> You just need to read you stream and apply an
>>>>>>> (windowed) aggregation on it.
>>>>>>> 
>>>>>>> If you use non-windowed aggregation you will get "since
>>>>>>> the beginning". If you use windowed aggregation you can
>>>>>>> specify the window size as 1 hour and get those
>>>>>>> results.
>>>>>>> 
>>>>>>> One comment: it seems that you want to count *all*
>>>>>>> queries. To make this work, you need to make sure all
>>>>>>> records are using the same key (because Kafka Streams
>>>>>>> only supports aggregation over keyed streams). Keep in
>>>>>>> mind, that this prohibits parallelization of you
>>>>>>> aggregation!
>>>>>>> 
>>>>>>> As a workaround, you could also do two consecutive 
>>>>>>> aggregation, and do parallelize the first one, and do
>>>>>>> not parallelize the second one (ie, using the first one
>>>>>>> as a pre aggregation similar to a combine step)
>>>>>>> 
>>>>>>> Without pre aggregation and assuming all records use
>>>>>>> the same key something like this (for current trunk):
>>>>>>> 
>>>>>>> 
>>>>>>>>>> KStreamBuilder builder = new KStreamBuilder():
>>>>>>>>>> KStream input = builder.stream("yourTopic");
>>>>>>>>>> 
>>>>>>>>>> KGroupedStream groupedInput =
>>>>>>>>>> input.groupByKey();
>>>>>>>>>> 
>>>>>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBegi
nni
>
>>>>>>>>>> 
ng"
>>>> 
>>>>>>>>>> 
> );
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> 
>>>>>>>>>> 
>>>> groupedInput.count(TimeWindows.of(3600 * 1000),
>>>>>>> "windowedCountStore").to("outputTopicHourlyCounts"):
>>>>>>> 
>>>>>>> 
>>>>>>> For more details, please see the docs and examples:
>>>>>>> 
>>>>>>> - http://docs.confluent.io/current/streams/index.html
>>>>>>> - 
>>>>>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-
3.0
>
>>>>>>> 
.1/
>>>> 
>>>>>>> 
> ka
>>>>>>> 
>>>>>>> 
>>>> fka-streams
>>>>>>> 
>>>>>>> 
>>>>>>> -Matthias
>>>>>>> 
>>>>>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> I could successfully run Kafka at my environment.
>>>>>>>>>> I want to monitor Queries per Second at my
>>>>>>>>>> search application with Kafka. Whenever a search
>>>>>>>>>> request is done I create a ProducerRecord which
>>>>>>>>>> holds current nano time of the system.
>>>>>>>>>> 
>>>>>>>>>> I know that I have to use a streaming API for 
>>>>>>>>>> calculation i.e. Kafka Streams or Spark Streams.
>>>>>>>>>> My choice is to use Kafka Streams.
>>>>>>>>>> 
>>>>>>>>>> For last 1 hours, or since the beginning, I have
>>>>>>>>>> to calculate the queries per second. How can I
>>>>>>>>>> make such an aggregation at Kafka Streams?
>>>>>>>>>> 
>>>>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYBqkRAAoJECnhiMLycopPLTUP/0fZeo1QP4xT0egmMfqvMw66
//NkZZSFacv+6EdXRmsewvJOTzYhePsDoBQX8SSAFuSc+PiMTz/A3opisorZj+zU
u5OngfvmIWpxMd/V960AZZIynPsvpqTR+7NVjGfvCJ3LNKVJNEMp3wyM5bGmGMsf
Nh69P4g6WNo6lG6weZDLYME+E8+gDQ0HgdoheK+/5ZBhxZFI6no/ab1hg9uIxjmG
EI1gMrnL2WP3ttTZVpOqiJnxZoKsu2wPLhpea86BRGtIaOdDQPXx/Mkt4+f36EjK
9h3+er2RUsKmXjFPgZe9zefigVPcYVIEjtLy2HjWF3Qwb3XhAWV9cY9izscPmsJy
spb7Gub8gQFI3SJJlYeS9Rlb7HaxQ2jUjsg0r3WUaQwcIZBwpk/1FI77Pj/Y60WH
ebhGPg6WJrg+5msw6N1wR4cxRaql2VPxHehlGT+D/7zyNuT7TiwZA6VU66Iao98o
rBodwnQAQ5DDZyFpGAdm1PucqzT5Psq10Nkxn4BMBvpg7+CzjkMpn2lcRfU+j3HL
4QBBDA2iH+glEzzjN3ag17SRJd3wwPbpxkbHQsQtCyDR9Ki4jpEyIO6xrLRb+9hg
G6u37217Zxgbh0jfUzWhNAOtu8wCtx6EVZ7QjJ5RUsgQhoQSFTgYBVVVU7G0kQT2
FF4TM+KTfpWwXKPTEwyV
=+X93
-----END PGP SIGNATURE-----

Re: Kafka Streams Aggregate By Date

Posted by Furkan KAMACI <fu...@gmail.com>.
Sorry about concurrent questions. Tried below code, didn't get any error
but couldn't get created output topic:

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 1000; i++) {
                producer.send(new ProducerRecord<>(
                        "input-topic",
                        String.format("{\"type\":\"test\", \"t\":%.3f,
\"k\":%d}", System.nanoTime() * 1e-9, i)));


        final KStreamBuilder builder = new KStreamBuilder();
        final KStream<String, Long> qps = builder.stream(Serdes.String(),
Serdes.Long(), "input-topic");
        qps.countByKey(TimeWindows.of("Hourly", 3600 *
1000)).mapValues(Object::toString).to("output-topic");

        final KafkaStreams streams = new KafkaStreams(builder,
streamsConfiguration);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Two things:
>
> 1) you should not apply the window to the first count, but to the base
> stream to get correct results.
>
> 2) your windowed aggregation, doew not just return String type, but
> Window<K> type. Thus, you need to either insert a .map() to transform
> you data into String typo, or you provide a custom serializer when
> writing data to output topic (method, .to(...) has multiple overloads)
>
> Per default, each topic read/write operation uses Serdes from the
> streams config. If you data has a different type, you need to provide
> appropriate Serdes for those operators.
>
>
> - -Matthias
>
> On 10/18/16 2:01 PM, Furkan KAMACI wrote:
> > Hi Matthias,
> >
> > I've tried this code:
> >
> > *        final Properties streamsConfiguration = new
> > Properties();* *
> > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > "myapp");* *
> > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");* *
> > streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > "localhost:2181");* *
> > streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());* *
> > streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());* *        final
> > KStreamBuilder builder = new KStreamBuilder();* *        final
> > KStream input = builder.stream("myapp-test");*
> >
> > *        final KStream<String, Long> searchCounts =
> > input.countByKey("SearchRequests").toStream();* *
> > searchCounts.countByKey(TimeWindows.of("Hourly", 3600 *
> > 1000)).to("outputTopicHourlyCounts");*
> >
> > *        final KafkaStreams streams = new KafkaStreams(builder,
> > streamsConfiguration);* *        streams.start();*
> >
> > *        Runtime.getRuntime().addShutdownHook(new
> > Thread(streams::close));*
> >
> > However I get an error:
> >
> >
> > *Exception in thread "StreamThread-1"
> > java.lang.ClassCastException:
> > org.apache.kafka.streams.kstream.Windowed cannot be cast to
> > java.lang.String*
> >
> > On the other hand when I try this code:
> >
> > https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a
> >
> >  I get an error too which indicates that:
> >
> > *Exception in thread "StreamThread-1"
> > org.apache.kafka.common.errors.SerializationException: Size of
> > data received by LongDeserializer is not 8 *
> >
> > Here is generated topic:
> >
> > *kafka-console-consumer --zookeeper localhost:2181 --topic
> > myapp-test --from-beginning* *        28952314828122* *
> > 28988681653726* *        29080089383233*
> >
> > I know that I miss something but couldn't find it.
> >
> > Kind Regards, Furkan KAMACI
> >
> > On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax
> > <ma...@confluent.io> wrote:
> >
> > I see. KGroupedStream will be part of 0.10.1.0 (should be release
> > the next weeks).
> >
> > So, instead of
> >
> >>>> .groupByKey().count()
> >
> > you need to do
> >
> >>>> .countByKey()
> >
> >
> >
> > -Matthias
> >
> > On 10/18/16 12:05 PM, Furkan KAMACI wrote:
> >>>> Hi Matthias,
> >>>>
> >>>> Thanks for your detailed answer. By the way I couldn't find
> >>>> "KGroupedStream" at version of 0.10.0.1?
> >>>>
> >>>> Kind Regards, Furkan KAMACI
> >>>>
> >>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax
> >>>> <ma...@confluent.io> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> You just need to read you stream and apply an (windowed)
> >>>> aggregation on it.
> >>>>
> >>>> If you use non-windowed aggregation you will get "since the
> >>>> beginning". If you use windowed aggregation you can specify
> >>>> the window size as 1 hour and get those results.
> >>>>
> >>>> One comment: it seems that you want to count *all* queries.
> >>>> To make this work, you need to make sure all records are
> >>>> using the same key (because Kafka Streams only supports
> >>>> aggregation over keyed streams). Keep in mind, that this
> >>>> prohibits parallelization of you aggregation!
> >>>>
> >>>> As a workaround, you could also do two consecutive
> >>>> aggregation, and do parallelize the first one, and do not
> >>>> parallelize the second one (ie, using the first one as a pre
> >>>> aggregation similar to a combine step)
> >>>>
> >>>> Without pre aggregation and assuming all records use the same
> >>>> key something like this (for current trunk):
> >>>>
> >>>>
> >>>>>>> KStreamBuilder builder = new KStreamBuilder(): KStream
> >>>>>>> input = builder.stream("yourTopic");
> >>>>>>>
> >>>>>>> KGroupedStream groupedInput = input.groupByKey();
> >>>>>>>
> >>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBeginni
> ng"
> >
> >>>>>>>
> );
> >>>>>>>
> >>>>>>>
> >>>>
> >>>>>>>
> > groupedInput.count(TimeWindows.of(3600 * 1000),
> >>>> "windowedCountStore").to("outputTopicHourlyCounts"):
> >>>>
> >>>>
> >>>> For more details, please see the docs and examples:
> >>>>
> >>>> - http://docs.confluent.io/current/streams/index.html -
> >>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0
> .1/
> >
> >>>>
> ka
> >>>>
> >>>>
> > fka-streams
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I could successfully run Kafka at my environment. I
> >>>>>>> want to monitor Queries per Second at my search
> >>>>>>> application with Kafka. Whenever a search request is
> >>>>>>> done I create a ProducerRecord which holds current nano
> >>>>>>> time of the system.
> >>>>>>>
> >>>>>>> I know that I have to use a streaming API for
> >>>>>>> calculation i.e. Kafka Streams or Spark Streams. My
> >>>>>>> choice is to use Kafka Streams.
> >>>>>>>
> >>>>>>> For last 1 hours, or since the beginning, I have to
> >>>>>>> calculate the queries per second. How can I make such
> >>>>>>> an aggregation at Kafka Streams?
> >>>>>>>
> >>>>>>> Kind Regards, Furkan KAMACI
> >>>>>>>
> >>>>>
> >>>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYBpDSAAoJECnhiMLycopPguYQAIfe/JwkpDgvePNJceb5s+kr
> oQQrQ2ja0A7R4aNmnFBFA5QZ9vbtP25CUCAD4y/FAKDoneGi8vYBf0Ky9l3flh5+
> admwq5wQyJesgS+mHTo/iUqHJUbTHTHixyKyvMMwmqJvgbaRkLFFx5GFhgrZZhHo
> 4jc0s1oebdzMA4dNkrdaM6+M0G9pZmE1ILz26EDPXdxfnBIp8zNK8LxqRubzvzML
> gv+wVU8USB2dkRR6WTB56WKlpfSFjAUweyrv9iEJdvfOwsuBStRf5ex7YG5BWbgi
> E3yCeKPR0GPy+3zj7Bjjsts5hYA0LZnJZpjGjpSxtd4dl/nH7El+SEEB+aNXv+3f
> UuSufV335sSDYteLMWySJBKQAu8AgDIeVnqMQwnaNywhhXVXuoLkoRv/h/x9Fiwk
> g26S7+JN4MQKwbHreMDrLSPEQy0oPdgCTtgcjA0BlOb6wzcUNNiETiyYVy2OoT04
> bCAge7KW43afmwiY4t7WetLjSvQMOJMq+tRArpVuX0Fk6IfE5LsiStRTQCnlQxHM
> ruXSbqPWh3DYU32EL/QwzyiiZhPUmjN5SCehBjmRWEfnEgay2qbXh0Hnft0sk6f0
> /SUbl/i11D4hhhPSnNTSQj9qEJT2SD7A7N90FplgQDwfCMWKg76Sfn85qMLiFnRE
> FDk0ghehl5ROJhXgs1eN
> =OU4j
> -----END PGP SIGNATURE-----
>

Re: Kafka Streams Aggregate By Date

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Two things:

1) you should not apply the window to the first count, but to the base
stream to get correct results.

2) your windowed aggregation, doew not just return String type, but
Window<K> type. Thus, you need to either insert a .map() to transform
you data into String typo, or you provide a custom serializer when
writing data to output topic (method, .to(...) has multiple overloads)

Per default, each topic read/write operation uses Serdes from the
streams config. If you data has a different type, you need to provide
appropriate Serdes for those operators.


- -Matthias

On 10/18/16 2:01 PM, Furkan KAMACI wrote:
> Hi Matthias,
> 
> I've tried this code:
> 
> *        final Properties streamsConfiguration = new
> Properties();* *
> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "myapp");* *
> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");* *
> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
> "localhost:2181");* *
> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());* *
> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());* *        final
> KStreamBuilder builder = new KStreamBuilder();* *        final
> KStream input = builder.stream("myapp-test");*
> 
> *        final KStream<String, Long> searchCounts = 
> input.countByKey("SearchRequests").toStream();* *
> searchCounts.countByKey(TimeWindows.of("Hourly", 3600 * 
> 1000)).to("outputTopicHourlyCounts");*
> 
> *        final KafkaStreams streams = new KafkaStreams(builder, 
> streamsConfiguration);* *        streams.start();*
> 
> *        Runtime.getRuntime().addShutdownHook(new
> Thread(streams::close));*
> 
> However I get an error:
> 
> 
> *Exception in thread "StreamThread-1"
> java.lang.ClassCastException: 
> org.apache.kafka.streams.kstream.Windowed cannot be cast to 
> java.lang.String*
> 
> On the other hand when I try this code:
> 
> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a
>
>  I get an error too which indicates that:
> 
> *Exception in thread "StreamThread-1" 
> org.apache.kafka.common.errors.SerializationException: Size of
> data received by LongDeserializer is not 8 *
> 
> Here is generated topic:
> 
> *kafka-console-consumer --zookeeper localhost:2181 --topic 
> myapp-test --from-beginning* *        28952314828122* *
> 28988681653726* *        29080089383233*
> 
> I know that I miss something but couldn't find it.
> 
> Kind Regards, Furkan KAMACI
> 
> On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax
> <ma...@confluent.io> wrote:
> 
> I see. KGroupedStream will be part of 0.10.1.0 (should be release
> the next weeks).
> 
> So, instead of
> 
>>>> .groupByKey().count()
> 
> you need to do
> 
>>>> .countByKey()
> 
> 
> 
> -Matthias
> 
> On 10/18/16 12:05 PM, Furkan KAMACI wrote:
>>>> Hi Matthias,
>>>> 
>>>> Thanks for your detailed answer. By the way I couldn't find 
>>>> "KGroupedStream" at version of 0.10.0.1?
>>>> 
>>>> Kind Regards, Furkan KAMACI
>>>> 
>>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax 
>>>> <ma...@confluent.io> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> You just need to read you stream and apply an (windowed) 
>>>> aggregation on it.
>>>> 
>>>> If you use non-windowed aggregation you will get "since the 
>>>> beginning". If you use windowed aggregation you can specify
>>>> the window size as 1 hour and get those results.
>>>> 
>>>> One comment: it seems that you want to count *all* queries.
>>>> To make this work, you need to make sure all records are
>>>> using the same key (because Kafka Streams only supports
>>>> aggregation over keyed streams). Keep in mind, that this
>>>> prohibits parallelization of you aggregation!
>>>> 
>>>> As a workaround, you could also do two consecutive
>>>> aggregation, and do parallelize the first one, and do not
>>>> parallelize the second one (ie, using the first one as a pre
>>>> aggregation similar to a combine step)
>>>> 
>>>> Without pre aggregation and assuming all records use the same
>>>> key something like this (for current trunk):
>>>> 
>>>> 
>>>>>>> KStreamBuilder builder = new KStreamBuilder(): KStream
>>>>>>> input = builder.stream("yourTopic");
>>>>>>> 
>>>>>>> KGroupedStream groupedInput = input.groupByKey();
>>>>>>> 
>>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBeginni
ng"
>
>>>>>>> 
);
>>>>>>> 
>>>>>>> 
>>>> 
>>>>>>> 
> groupedInput.count(TimeWindows.of(3600 * 1000),
>>>> "windowedCountStore").to("outputTopicHourlyCounts"):
>>>> 
>>>> 
>>>> For more details, please see the docs and examples:
>>>> 
>>>> - http://docs.confluent.io/current/streams/index.html - 
>>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0
.1/
>
>>>> 
ka
>>>> 
>>>> 
> fka-streams
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> I could successfully run Kafka at my environment. I
>>>>>>> want to monitor Queries per Second at my search
>>>>>>> application with Kafka. Whenever a search request is
>>>>>>> done I create a ProducerRecord which holds current nano
>>>>>>> time of the system.
>>>>>>> 
>>>>>>> I know that I have to use a streaming API for
>>>>>>> calculation i.e. Kafka Streams or Spark Streams. My
>>>>>>> choice is to use Kafka Streams.
>>>>>>> 
>>>>>>> For last 1 hours, or since the beginning, I have to
>>>>>>> calculate the queries per second. How can I make such
>>>>>>> an aggregation at Kafka Streams?
>>>>>>> 
>>>>>>> Kind Regards, Furkan KAMACI
>>>>>>> 
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYBpDSAAoJECnhiMLycopPguYQAIfe/JwkpDgvePNJceb5s+kr
oQQrQ2ja0A7R4aNmnFBFA5QZ9vbtP25CUCAD4y/FAKDoneGi8vYBf0Ky9l3flh5+
admwq5wQyJesgS+mHTo/iUqHJUbTHTHixyKyvMMwmqJvgbaRkLFFx5GFhgrZZhHo
4jc0s1oebdzMA4dNkrdaM6+M0G9pZmE1ILz26EDPXdxfnBIp8zNK8LxqRubzvzML
gv+wVU8USB2dkRR6WTB56WKlpfSFjAUweyrv9iEJdvfOwsuBStRf5ex7YG5BWbgi
E3yCeKPR0GPy+3zj7Bjjsts5hYA0LZnJZpjGjpSxtd4dl/nH7El+SEEB+aNXv+3f
UuSufV335sSDYteLMWySJBKQAu8AgDIeVnqMQwnaNywhhXVXuoLkoRv/h/x9Fiwk
g26S7+JN4MQKwbHreMDrLSPEQy0oPdgCTtgcjA0BlOb6wzcUNNiETiyYVy2OoT04
bCAge7KW43afmwiY4t7WetLjSvQMOJMq+tRArpVuX0Fk6IfE5LsiStRTQCnlQxHM
ruXSbqPWh3DYU32EL/QwzyiiZhPUmjN5SCehBjmRWEfnEgay2qbXh0Hnft0sk6f0
/SUbl/i11D4hhhPSnNTSQj9qEJT2SD7A7N90FplgQDwfCMWKg76Sfn85qMLiFnRE
FDk0ghehl5ROJhXgs1eN
=OU4j
-----END PGP SIGNATURE-----

Re: Kafka Streams Aggregate By Date

Posted by Furkan KAMACI <fu...@gmail.com>.
Hi Matthias,

I've tried this code:

*        final Properties streamsConfiguration = new Properties();*
*        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"myapp");*
*        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");*
*        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
"localhost:2181");*
*        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());*
*        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());*
*        final KStreamBuilder builder = new KStreamBuilder();*
*        final KStream input = builder.stream("myapp-test");*

*        final KStream<String, Long> searchCounts =
input.countByKey("SearchRequests").toStream();*
*        searchCounts.countByKey(TimeWindows.of("Hourly", 3600 *
1000)).to("outputTopicHourlyCounts");*

*        final KafkaStreams streams = new KafkaStreams(builder,
streamsConfiguration);*
*        streams.start();*

*        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));*

However I get an error:


*Exception in thread "StreamThread-1" java.lang.ClassCastException:
org.apache.kafka.streams.kstream.Windowed cannot be cast to
java.lang.String*

On the other hand when I try this code:

https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a

I get an error too which indicates that:

*Exception in thread "StreamThread-1"
org.apache.kafka.common.errors.SerializationException: Size of data
received by LongDeserializer is not 8 *

Here is generated topic:

        *kafka-console-consumer --zookeeper localhost:2181 --topic
myapp-test --from-beginning*
*        28952314828122*
*        28988681653726*
*        29080089383233*

I know that I miss something but couldn't find it.

Kind Regards,
Furkan KAMACI

On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> I see. KGroupedStream will be part of 0.10.1.0 (should be release the
> next weeks).
>
> So, instead of
>
> > .groupByKey().count()
>
> you need to do
>
> > .countByKey()
>
>
>
> - -Matthias
>
> On 10/18/16 12:05 PM, Furkan KAMACI wrote:
> > Hi Matthias,
> >
> > Thanks for your detailed answer. By the way I couldn't find
> > "KGroupedStream" at version of 0.10.0.1?
> >
> > Kind Regards, Furkan KAMACI
> >
> > On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax
> > <ma...@confluent.io> wrote:
> >
> > Hi,
> >
> > You just need to read you stream and apply an (windowed)
> > aggregation on it.
> >
> > If you use non-windowed aggregation you will get "since the
> > beginning". If you use windowed aggregation you can specify the
> > window size as 1 hour and get those results.
> >
> > One comment: it seems that you want to count *all* queries. To
> > make this work, you need to make sure all records are using the
> > same key (because Kafka Streams only supports aggregation over
> > keyed streams). Keep in mind, that this prohibits parallelization
> > of you aggregation!
> >
> > As a workaround, you could also do two consecutive aggregation, and
> > do parallelize the first one, and do not parallelize the second one
> > (ie, using the first one as a pre aggregation similar to a combine
> > step)
> >
> > Without pre aggregation and assuming all records use the same key
> > something like this (for current trunk):
> >
> >
> >>>> KStreamBuilder builder = new KStreamBuilder(): KStream input
> >>>> = builder.stream("yourTopic");
> >>>>
> >>>> KGroupedStream groupedInput = input.groupByKey();
> >>>>
> >>>> groupedInput.count("countStore").to("outputTopicCountFromBeginning"
> );
> >>>>
> >>>>
> >
> >>>>
> groupedInput.count(TimeWindows.of(3600 * 1000),
> > "windowedCountStore").to("outputTopicHourlyCounts"):
> >
> >
> > For more details, please see the docs and examples:
> >
> > - http://docs.confluent.io/current/streams/index.html -
> > https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/
> ka
> >
> >
> fka-streams
> >
> >
> > -Matthias
> >
> > On 10/18/16 5:00 AM, Furkan KAMACI wrote:
> >>>> Hi,
> >>>>
> >>>> I could successfully run Kafka at my environment. I want to
> >>>> monitor Queries per Second at my search application with
> >>>> Kafka. Whenever a search request is done I create a
> >>>> ProducerRecord which holds current nano time of the system.
> >>>>
> >>>> I know that I have to use a streaming API for calculation
> >>>> i.e. Kafka Streams or Spark Streams. My choice is to use
> >>>> Kafka Streams.
> >>>>
> >>>> For last 1 hours, or since the beginning, I have to calculate
> >>>> the queries per second. How can I make such an aggregation at
> >>>> Kafka Streams?
> >>>>
> >>>> Kind Regards, Furkan KAMACI
> >>>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYBnlRAAoJECnhiMLycopPF+cQAKWt58HvcEebqXC+KlSc5M8c
> rcxqTbkH3YT9SEm0zLinoXWaJyd/EHUkaWSStiNekZgRe9BsXBHjFnhy/Pg20D0A
> JYKBA0IK4DTBy6sJvu1Wyd08iQ85HTmFlMZDg38EkTJOkp8SnPhQ4O2/IKyudWFD
> kLBBJBLSEEdFcP+HWnP469rcfVBcr7kE+bgPxAPTLH0/v0G7+RAwwxi/wfV+c/TB
> kvGkn+sYgRtyduUS62wVUTC4tOYAuooqn6/Aiwdu+e/a4+S0DsSoQQi0Oyts+gd9
> 6/aDLPnGrHT1kUMNbGIqOqLLw2rxs3NtQXFB3odjgt+rHtEuItqohgkV5SCjut3Y
> Uv89xQOKrx9TgtTUTcra3ckwffVFNsFa+DGuZbMvm2P2hC1k/7yCZGa+0l6vRauk
> wQ5dw0Ug/DGWHYFSIBuDz81mDsmgmpLh/QXIcqIJ3rQ1VgDbfopwQhuuaQiaEPDF
> p9S524sy3EYMVGqzdWOFC2+7MVYrnWK6CEkxpAvOGqJw951eAObM9OFmiN1o0wJ4
> Kkif20adZRY6HANFyurEkPHs2id/JVh/LVkV6DO/DAtqun4rFesuC3m8bUyOlBjq
> UbHmDnq40X6uohvfiurO4NGmOfLBEm6GQPxTyNFgEUCBrORjsgXaY7bpzsxUNvvc
> u+554Ztge1RtJCjbbtR1
> =z4/M
> -----END PGP SIGNATURE-----
>

Re: Kafka Streams Aggregate By Date

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

I see. KGroupedStream will be part of 0.10.1.0 (should be release the
next weeks).

So, instead of

> .groupByKey().count()

you need to do

> .countByKey()



- -Matthias

On 10/18/16 12:05 PM, Furkan KAMACI wrote:
> Hi Matthias,
> 
> Thanks for your detailed answer. By the way I couldn't find
> "KGroupedStream" at version of 0.10.0.1?
> 
> Kind Regards, Furkan KAMACI
> 
> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax
> <ma...@confluent.io> wrote:
> 
> Hi,
> 
> You just need to read you stream and apply an (windowed)
> aggregation on it.
> 
> If you use non-windowed aggregation you will get "since the 
> beginning". If you use windowed aggregation you can specify the
> window size as 1 hour and get those results.
> 
> One comment: it seems that you want to count *all* queries. To
> make this work, you need to make sure all records are using the
> same key (because Kafka Streams only supports aggregation over
> keyed streams). Keep in mind, that this prohibits parallelization
> of you aggregation!
> 
> As a workaround, you could also do two consecutive aggregation, and
> do parallelize the first one, and do not parallelize the second one
> (ie, using the first one as a pre aggregation similar to a combine
> step)
> 
> Without pre aggregation and assuming all records use the same key 
> something like this (for current trunk):
> 
> 
>>>> KStreamBuilder builder = new KStreamBuilder(): KStream input
>>>> = builder.stream("yourTopic");
>>>> 
>>>> KGroupedStream groupedInput = input.groupByKey();
>>>> 
>>>> groupedInput.count("countStore").to("outputTopicCountFromBeginning"
);
>>>>
>>>>
>
>>>> 
groupedInput.count(TimeWindows.of(3600 * 1000),
> "windowedCountStore").to("outputTopicHourlyCounts"):
> 
> 
> For more details, please see the docs and examples:
> 
> - http://docs.confluent.io/current/streams/index.html - 
> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/
ka
>
> 
fka-streams
> 
> 
> -Matthias
> 
> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
>>>> Hi,
>>>> 
>>>> I could successfully run Kafka at my environment. I want to
>>>> monitor Queries per Second at my search application with
>>>> Kafka. Whenever a search request is done I create a
>>>> ProducerRecord which holds current nano time of the system.
>>>> 
>>>> I know that I have to use a streaming API for calculation
>>>> i.e. Kafka Streams or Spark Streams. My choice is to use
>>>> Kafka Streams.
>>>> 
>>>> For last 1 hours, or since the beginning, I have to calculate
>>>> the queries per second. How can I make such an aggregation at
>>>> Kafka Streams?
>>>> 
>>>> Kind Regards, Furkan KAMACI
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYBnlRAAoJECnhiMLycopPF+cQAKWt58HvcEebqXC+KlSc5M8c
rcxqTbkH3YT9SEm0zLinoXWaJyd/EHUkaWSStiNekZgRe9BsXBHjFnhy/Pg20D0A
JYKBA0IK4DTBy6sJvu1Wyd08iQ85HTmFlMZDg38EkTJOkp8SnPhQ4O2/IKyudWFD
kLBBJBLSEEdFcP+HWnP469rcfVBcr7kE+bgPxAPTLH0/v0G7+RAwwxi/wfV+c/TB
kvGkn+sYgRtyduUS62wVUTC4tOYAuooqn6/Aiwdu+e/a4+S0DsSoQQi0Oyts+gd9
6/aDLPnGrHT1kUMNbGIqOqLLw2rxs3NtQXFB3odjgt+rHtEuItqohgkV5SCjut3Y
Uv89xQOKrx9TgtTUTcra3ckwffVFNsFa+DGuZbMvm2P2hC1k/7yCZGa+0l6vRauk
wQ5dw0Ug/DGWHYFSIBuDz81mDsmgmpLh/QXIcqIJ3rQ1VgDbfopwQhuuaQiaEPDF
p9S524sy3EYMVGqzdWOFC2+7MVYrnWK6CEkxpAvOGqJw951eAObM9OFmiN1o0wJ4
Kkif20adZRY6HANFyurEkPHs2id/JVh/LVkV6DO/DAtqun4rFesuC3m8bUyOlBjq
UbHmDnq40X6uohvfiurO4NGmOfLBEm6GQPxTyNFgEUCBrORjsgXaY7bpzsxUNvvc
u+554Ztge1RtJCjbbtR1
=z4/M
-----END PGP SIGNATURE-----

Re: Kafka Streams Aggregate By Date

Posted by Furkan KAMACI <fu...@gmail.com>.
Hi Matthias,

Thanks for your detailed answer. By the way I couldn't find "KGroupedStream"
at version of 0.10.0.1?

Kind Regards,
Furkan KAMACI

On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hi,
>
> You just need to read you stream and apply an (windowed) aggregation
> on it.
>
> If you use non-windowed aggregation you will get "since the
> beginning". If you use windowed aggregation you can specify the window
> size as 1 hour and get those results.
>
> One comment: it seems that you want to count *all* queries. To make
> this work, you need to make sure all records are using the same key
> (because Kafka Streams only supports aggregation over keyed streams).
> Keep in mind, that this prohibits parallelization of you aggregation!
>
> As a workaround, you could also do two consecutive aggregation, and do
> parallelize the first one, and do not parallelize the second one (ie,
> using the first one as a pre aggregation similar to a combine step)
>
> Without pre aggregation and assuming all records use the same key
> something like this (for current trunk):
>
>
> > KStreamBuilder builder = new KStreamBuilder(): KStream input =
> > builder.stream("yourTopic");
> >
> > KGroupedStream groupedInput = input.groupByKey();
> >
> > groupedInput.count("countStore").to("outputTopicCountFromBeginning");
> >
> >
> groupedInput.count(TimeWindows.of(3600 * 1000),
> "windowedCountStore").to("outputTopicHourlyCounts"):
>
>
> For more details, please see the docs and examples:
>
>  - http://docs.confluent.io/current/streams/index.html
>  -
> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/ka
> fka-streams
>
>
> - -Matthias
>
> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
> > Hi,
> >
> > I could successfully run Kafka at my environment. I want to monitor
> > Queries per Second at my search application with Kafka. Whenever a
> > search request is done I create a ProducerRecord which holds
> > current nano time of the system.
> >
> > I know that I have to use a streaming API for calculation i.e.
> > Kafka Streams or Spark Streams. My choice is to use Kafka Streams.
> >
> > For last 1 hours, or since the beginning, I have to calculate the
> > queries per second. How can I make such an aggregation at Kafka
> > Streams?
> >
> > Kind Regards, Furkan KAMACI
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYBl6wAAoJECnhiMLycopPVVAP/0EqJJsLnKqvMeIM3XmV7dzP
> JnvHJdj0QUn2ONe1Fl9PEDxQvqkw0x/45fBfZsoWqMvIn5uvPfkeF0+TSLFUVUsu
> 6r+QV8xjJ53GTuPvBQOcUx1H7onXyPkfa88OGVMFV0Er7/1C/p6CAT/MF8x04Fjh
> VqT0EQbqVWxoLXdm+GHaUEgdIsJNaXzOzBcxPL9ayA71G4UtwGUud86kjU8CvURJ
> wDsZYdWa2TebqG5g80l1YPzRDbNgHKJ4ezHKxdZ+XufizGcoE48BsGzHe09RQDbZ
> 5aiW+rVXO9dQBIP+3FA3Yeno6+lnGmIECFiHw0FaudOVJIxm40eyTltHjmODMP6T
> P55XQKvs6rVwjTp1uxcvrggXtkp+B/Wdglo5RM+MAZ/MkZXc8ruY2G4JYqn3Ko7q
> 1eEKDpvkbhKGDE9HJGmH0pmYXgSXYhNZPUAURy6pgbpAapysZovJJG1tvIFY2E4R
> EpZPHc9JaXOdlOAsK9q468VrCx1pOakC8AZYUAm6vRiSLHGYjiT8sTHQf3IWjP4q
> HPCtwk6IZGTGjdLyyMHGm2vbmtiMPBdAN/pau9pehFb5c7Np2uT8WyBL0ECgdOmb
> MoxtytRsbuMchZKUo5Wa2wEaBpKwiAnGssW94e3FF898P2tV0br1lLXyrsyNnakN
> qOb2YW0mz/+66AJsJw90
> =X1XQ
> -----END PGP SIGNATURE-----
>

Re: Kafka Streams Aggregate By Date

Posted by "Matthias J. Sax" <ma...@confluent.io>.
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi,

You just need to read you stream and apply an (windowed) aggregation
on it.

If you use non-windowed aggregation you will get "since the
beginning". If you use windowed aggregation you can specify the window
size as 1 hour and get those results.

One comment: it seems that you want to count *all* queries. To make
this work, you need to make sure all records are using the same key
(because Kafka Streams only supports aggregation over keyed streams).
Keep in mind, that this prohibits parallelization of you aggregation!

As a workaround, you could also do two consecutive aggregation, and do
parallelize the first one, and do not parallelize the second one (ie,
using the first one as a pre aggregation similar to a combine step)

Without pre aggregation and assuming all records use the same key
something like this (for current trunk):


> KStreamBuilder builder = new KStreamBuilder(): KStream input =
> builder.stream("yourTopic");
> 
> KGroupedStream groupedInput = input.groupByKey();
> 
> groupedInput.count("countStore").to("outputTopicCountFromBeginning");
>
> 
groupedInput.count(TimeWindows.of(3600 * 1000),
"windowedCountStore").to("outputTopicHourlyCounts"):


For more details, please see the docs and examples:

 - http://docs.confluent.io/current/streams/index.html
 -
https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/ka
fka-streams


- -Matthias

On 10/18/16 5:00 AM, Furkan KAMACI wrote:
> Hi,
> 
> I could successfully run Kafka at my environment. I want to monitor
> Queries per Second at my search application with Kafka. Whenever a
> search request is done I create a ProducerRecord which holds
> current nano time of the system.
> 
> I know that I have to use a streaming API for calculation i.e.
> Kafka Streams or Spark Streams. My choice is to use Kafka Streams.
> 
> For last 1 hours, or since the beginning, I have to calculate the
> queries per second. How can I make such an aggregation at Kafka
> Streams?
> 
> Kind Regards, Furkan KAMACI
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYBl6wAAoJECnhiMLycopPVVAP/0EqJJsLnKqvMeIM3XmV7dzP
JnvHJdj0QUn2ONe1Fl9PEDxQvqkw0x/45fBfZsoWqMvIn5uvPfkeF0+TSLFUVUsu
6r+QV8xjJ53GTuPvBQOcUx1H7onXyPkfa88OGVMFV0Er7/1C/p6CAT/MF8x04Fjh
VqT0EQbqVWxoLXdm+GHaUEgdIsJNaXzOzBcxPL9ayA71G4UtwGUud86kjU8CvURJ
wDsZYdWa2TebqG5g80l1YPzRDbNgHKJ4ezHKxdZ+XufizGcoE48BsGzHe09RQDbZ
5aiW+rVXO9dQBIP+3FA3Yeno6+lnGmIECFiHw0FaudOVJIxm40eyTltHjmODMP6T
P55XQKvs6rVwjTp1uxcvrggXtkp+B/Wdglo5RM+MAZ/MkZXc8ruY2G4JYqn3Ko7q
1eEKDpvkbhKGDE9HJGmH0pmYXgSXYhNZPUAURy6pgbpAapysZovJJG1tvIFY2E4R
EpZPHc9JaXOdlOAsK9q468VrCx1pOakC8AZYUAm6vRiSLHGYjiT8sTHQf3IWjP4q
HPCtwk6IZGTGjdLyyMHGm2vbmtiMPBdAN/pau9pehFb5c7Np2uT8WyBL0ECgdOmb
MoxtytRsbuMchZKUo5Wa2wEaBpKwiAnGssW94e3FF898P2tV0br1lLXyrsyNnakN
qOb2YW0mz/+66AJsJw90
=X1XQ
-----END PGP SIGNATURE-----