You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Matt Daum <ma...@setfive.com> on 2018/03/02 14:24:31 UTC

Kafka Setup for Daily counts on wide array of keys

I am new to Kafka but I think I have a good use case for it.  I am trying
to build daily counts of requests based on a number of different attributes
in a high throughput system (~1 million requests/sec. across all  8
servers).  The different attributes are unbounded in terms of values, and
some will spread across 100's of millions values.  This is my current
through process, let me know where I could be more efficient or if there is
a better way to do it.

I'll create an AVRO object "Impression" which has all the attributes of the
inbound request.  My application servers then will on each request create
and send this to a single kafka topic.

I'll then have a consumer which creates a stream from the topic.  From
there I'll use the windowed timeframes and groupBy to group by the
attributes on each given day.  At the end of the day I'd need to read out
the data store to an external system for storage.  Since I won't know all
the values I'd need something similar to the KVStore.all() but for
WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
commit:
https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
.

Is this the best approach to doing this?  Or would I be better using the
stream to listen and then an external DB like Aerospike to store the counts
and read out of it directly end of day.

Thanks for the help!
Daum

Re: Kafka Setup for Daily counts on wide array of keys

Posted by "Thakrar, Jayesh" <jt...@conversantmedia.com>.
Sorry Matt, I don’t have much idea about Kafka streaming (or any streaming for that matter).
As for saving counts from your application servers to Aerospike directly, that is certain simpler, requiring less hardware, resources and development effort.

One reason some people use Kafka as part of their pipeline is to decouple systems and protect either end from issues in the other.
It usually makes maintenance on either end simple. Furthermore, it acts as a dampening buffer and because of Kafka's low latency and high throughput(well, that's a relative term), allows the producers and consumers run at their full potential (kind of, but not exactly async push and pull of data).

It might even be worthwhile to start off without Kafka and once you understand things better introduce Kafka later on.

From: Matt Daum <ma...@setfive.com>
Date: Monday, March 5, 2018 at 4:33 PM
To: "Thakrar, Jayesh" <jt...@conversantmedia.com>
Cc: "users@kafka.apache.org" <us...@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

And not to overthink this, but as I'm new to Kafka and streams I want to make sure that it makes the most sense to for my use case.  With the streams and grouping, it looks like I'd be getting at 1 internal topic created per grouped stream which then would written and reread then totaled in the count, then would have that produce a final stream which is then consumed/sinked to an external db.   Is that correct?

Overall I'm not using the streaming counts as it grows throughout the day, but just want a final end of day count.  We already have an Aerospike cluster setup for the applications themselves.  If each application server itself made the writes to the Aerospike DB cluster to simply increase the counts for each attribute then at end of day read it out there it appears it'd be less computing resources used.  As we'd effectively be doing inbound request -> DB write per counted attribute.

I am not saying that is the better route as I'm I don't fully know or understand the full capabilities of Kafka.  Since we aren't streaming the data, enriching it, etc. would the direct to DB counts be a better approach?  I just want to make sure I use the best tool for the job.    Let me know what other factors I may be underestimating/misunderstanding on the Kafka approach please.  I want to be informed as possible before going down either path too far.

Thank you again for your time,
Matt

On Mon, Mar 5, 2018 at 3:14 PM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
Yep, exactly.

So there is some buffering that you need to do in your client and also deal with edge cases.
E.g. how long should you hold on to a batch before you send a smaller batch to producer since you want a balance between batch optimization and expedience.

You may need to do some experiments to balance between system throughput, record size, batch size and potential batching delay for a given rate of incoming requests.


From: Matt Daum <ma...@setfive.com>>
Date: Monday, March 5, 2018 at 1:59 PM

To: "Thakrar, Jayesh" <jt...@conversantmedia.com>>
Cc: "users@kafka.apache.org<ma...@kafka.apache.org>" <us...@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Ah good call, so you are really having an AVRO wrapper around your single class right?  IE an array of records, correct?  Then when you hit a size you are happy you send it to the producer?

On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
Good luck on your test!

As for the batching within Avro and by Kafka Producer, here are my thoughts without any empirical proof.
There is a certain amount of overhead in terms of execution AND bytes in converting a request record into Avro and producing (generating) a Kafka message out of it.
For requests of size 100-200 bytes, that can be a substantial amount - especially the fact that you will be bundling the Avro schema for each request in its Kafka message.

By batching the requests, you are significantly amortizing that overhead across many rows.

From: Matt Daum <ma...@setfive.com>>
Date: Monday, March 5, 2018 at 5:54 AM

To: "Thakrar, Jayesh" <jt...@conversantmedia.com>>
Cc: "users@kafka.apache.org<ma...@kafka.apache.org>" <us...@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks for the suggestions!  It does look like it's using local RocksDB stores for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example would be assume on each requests there is a parameters "X" which at the end of each day I want to know the counts per unique value, it could have 100's of millions of possible values.

I'll start to hopefully work this week on an initial test of everything and will report back.  A few last questions if you have the time:
- For the batching of the AVRO files, would this be different than the Producer batching?
- Any other things you'd suggest looking out for as gotcha's or configurations that probably will be good to tweak further?

Thanks!
Matt

On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
BTW - I did not mean to rule-out Aerospike as a possible datastore.
Its just that I am not familiar with it, but surely looks like a good candidate to store the raw and/or aggregated data, given that it also has a Kafka Connect module.

From: "Thakrar, Jayesh" <jt...@conversantmedia.com>>
Date: Sunday, March 4, 2018 at 9:25 PM
To: Matt Daum <ma...@setfive.com>>

Cc: "users@kafka.apache.org<ma...@kafka.apache.org>" <us...@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

I don’t have any experience/knowledge on the Kafka inbuilt datastore, but believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore completely (if appropriate) and store the raw data on HDFS organized by (say) date+hour
by using periodic (minute to hourly) extract jobs and store data in hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do compaction on data which leads to unnecessary reads and writes (referred to as write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure what you mean by them.
Is it that each record has some fields that represent different kinds of attributes and that their domain can have millions to hundreds of millions of values?
I don't think that should matter.

From: Matt Daum <ma...@setfive.com>>
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" <jt...@conversantmedia.com>>
Cc: "users@kafka.apache.org<ma...@kafka.apache.org>" <us...@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks! For the counts I'd need to use a global table to make sure it's across all the data right?   Also having millions of different values per grouped attribute will scale ok?

On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" <jt...@conversantmedia.com>> wrote:
Yes, that's the general design pattern. Another thing to look into is to compress the data. Now Kafka consumer/producer can already do it for you, but we choose to compress in the applications due to a historic issue that drgraded performance,  although it has been resolved now.
Also,  just keep in mind that while you do your batching, kafka producer also tries to batch msgs to Kafka, and you will need to ensure you have enough buffer memory. However that's all configurable.
Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.
Jayesh

________________________________
From: Matt Daum <ma...@setfive.com>>
Sent: Sunday, March 4, 2018 7:06:19 AM
To: Thakrar, Jayesh
Cc: users@kafka.apache.org<ma...@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

We actually don't have a kafka cluster setup yet at all.  Right now just have 8 of our application servers.  We currently sample some impressions and then dedupe/count outside at a different DC, but are looking to try to analyze all impressions for some overall analytics.

Our requests are around 100-200 bytes each.  If we lost some of them due to network jitter etc. it would be fine we're trying to just get overall a rough count of each attribute.  Creating batched messages definitely makes sense and will also cut down on the network IO.

We're trying to determine the required setup for Kafka to do what we're looking to do as these are physical servers so we'll most likely need to buy new hardware.  For the first run I think we'll try it out on one of our application clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka cluster on the same machines as the applications.

So would the best route here be something like each application server batches requests, send it to kafka, have a stream consumer that then tallies up the totals per attribute that we want to track, output that to a new topic, which then goes to a sink to either a DB or something like S3 which then we read into our external DBs?

Thanks!

On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support  about 1 million requests/sec into the cluster from source servers and expect to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical avro file (byte buffer) of say N requests and then making that into one Kafka msg payload.

We have a similar situation (https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found anything from 4x to 10x better throughput with batching as compared to one request per msg.
We have different kinds of msgs/topics and the individual "request" size varies from  about 100 bytes to 1+ KB.

On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com>> wrote:

    I am new to Kafka but I think I have a good use case for it.  I am trying
    to build daily counts of requests based on a number of different attributes
    in a high throughput system (~1 million requests/sec. across all  8
    servers).  The different attributes are unbounded in terms of values, and
    some will spread across 100's of millions values.  This is my current
    through process, let me know where I could be more efficient or if there is
    a better way to do it.

    I'll create an AVRO object "Impression" which has all the attributes of the
    inbound request.  My application servers then will on each request create
    and send this to a single kafka topic.

    I'll then have a consumer which creates a stream from the topic.  From
    there I'll use the windowed timeframes and groupBy to group by the
    attributes on each given day.  At the end of the day I'd need to read out
    the data store to an external system for storage.  Since I won't know all
    the values I'd need something similar to the KVStore.all() but for
    WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
    commit:
    https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
    .

    Is this the best approach to doing this?  Or would I be better using the
    stream to listen and then an external DB like Aerospike to store the counts
    and read out of it directly end of day.

    Thanks for the help!
    Daum





Re: Kafka Setup for Daily counts on wide array of keys

Posted by Matt Daum <ma...@setfive.com>.
And not to overthink this, but as I'm new to Kafka and streams I want to
make sure that it makes the most sense to for my use case.  With the
streams and grouping, it looks like I'd be getting at 1 internal topic
created per grouped stream which then would written and reread then totaled
in the count, then would have that produce a final stream which is then
consumed/sinked to an external db.   Is that correct?

Overall I'm not using the streaming counts as it grows throughout the day,
but just want a final end of day count.  We already have an Aerospike
cluster setup for the applications themselves.  If each application server
itself made the writes to the Aerospike DB cluster to simply increase the
counts for each attribute then at end of day read it out there it appears
it'd be less computing resources used.  As we'd effectively be doing
inbound request -> DB write per counted attribute.

I am not saying that is the better route as I'm I don't fully know or
understand the full capabilities of Kafka.  Since we aren't streaming the
data, enriching it, etc. would the direct to DB counts be a better
approach?  I just want to make sure I use the best tool for the job.    Let
me know what other factors I may be underestimating/misunderstanding on the
Kafka approach please.  I want to be informed as possible before going down
either path too far.

Thank you again for your time,
Matt

On Mon, Mar 5, 2018 at 3:14 PM, Thakrar, Jayesh <
jthakrar@conversantmedia.com> wrote:

> Yep, exactly.
>
>
>
> So there is some buffering that you need to do in your client and also
> deal with edge cases.
>
> E.g. how long should you hold on to a batch before you send a smaller
> batch to producer since you want a balance between batch optimization and
> expedience.
>
>
>
> You may need to do some experiments to balance between system throughput,
> record size, batch size and potential batching delay for a given rate of
> incoming requests.
>
>
>
>
>
> *From: *Matt Daum <ma...@setfive.com>
> *Date: *Monday, March 5, 2018 at 1:59 PM
>
> *To: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
> *Cc: *"users@kafka.apache.org" <us...@kafka.apache.org>
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> Ah good call, so you are really having an AVRO wrapper around your single
> class right?  IE an array of records, correct?  Then when you hit a size
> you are happy you send it to the producer?
>
>
>
> On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh <
> jthakrar@conversantmedia.com> wrote:
>
> Good luck on your test!
>
>
>
> As for the batching within Avro and by Kafka Producer, here are my
> thoughts without any empirical proof.
>
> There is a certain amount of overhead in terms of execution AND bytes in
> converting a request record into Avro and producing (generating) a Kafka
> message out of it.
>
> For requests of size 100-200 bytes, that can be a substantial amount -
> especially the fact that you will be bundling the Avro schema for each
> request in its Kafka message.
>
>
>
> By batching the requests, you are significantly amortizing that overhead
> across many rows.
>
>
>
> *From: *Matt Daum <ma...@setfive.com>
> *Date: *Monday, March 5, 2018 at 5:54 AM
>
>
> *To: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
> *Cc: *"users@kafka.apache.org" <us...@kafka.apache.org>
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> Thanks for the suggestions!  It does look like it's using local RocksDB
> stores for the state info by default.  Will look into using an external
> one.
>
>
>
> As for the "millions of different values per grouped attribute" an example
> would be assume on each requests there is a parameters "X" which at the end
> of each day I want to know the counts per unique value, it could have 100's
> of millions of possible values.
>
>
>
> I'll start to hopefully work this week on an initial test of everything
> and will report back.  A few last questions if you have the time:
>
> - For the batching of the AVRO files, would this be different than the
> Producer batching?
>
> - Any other things you'd suggest looking out for as gotcha's or
> configurations that probably will be good to tweak further?
>
>
>
> Thanks!
>
> Matt
>
>
>
> On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh <
> jthakrar@conversantmedia.com> wrote:
>
> BTW - I did not mean to rule-out Aerospike as a possible datastore.
>
> Its just that I am not familiar with it, but surely looks like a good
> candidate to store the raw and/or aggregated data, given that it also has a
> Kafka Connect module.
>
>
>
> *From: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
> *Date: *Sunday, March 4, 2018 at 9:25 PM
> *To: *Matt Daum <ma...@setfive.com>
>
>
> *Cc: *"users@kafka.apache.org" <us...@kafka.apache.org>
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> I don’t have any experience/knowledge on the Kafka inbuilt datastore, but
> believe thatfor some
>
> portions of streaming Kafka uses (used?) RocksDB to locally store some
> state info in the brokers.
>
>
>
> Personally  I would use an external datastore.
>
> There's a wide choice out there - regular key-value stores like Cassandra,
> ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular
> RDBMSes.
>
> If you have hadoop in the picture, its even possible to bypass a datastore
> completely (if appropriate) and store the raw data on HDFS organized by
> (say) date+hour
>
> by using periodic (minute to hourly) extract jobs and store data in
> hive-compatible directory structure using ORC or Parquet.
>
>
>
> The reason for shying away from NoSQL datastores is their tendency to do
> compaction on data which leads to unnecessary reads and writes (referred to
> as write-amplification).
>
> With periodic jobs in Hadoop, you (usually) write your data once only.
> Ofcourse with that approach you loose the "random/keyed access" to the
> data,
>
> but if you are only interested in the aggregations across various
> dimensions, those can be stored in a SQL/NoSQL datastore.
>
>
>
> As for "having millions of different values per grouped attribute" - not
> sure what you mean by them.
>
> Is it that each record has some fields that represent different kinds of
> attributes and that their domain can have millions to hundreds of millions
> of values?
>
> I don't think that should matter.
>
>
>
> *From: *Matt Daum <ma...@setfive.com>
> *Date: *Sunday, March 4, 2018 at 2:39 PM
> *To: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
> *Cc: *"users@kafka.apache.org" <us...@kafka.apache.org>
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> Thanks! For the counts I'd need to use a global table to make sure it's
> across all the data right?   Also having millions of different values per
> grouped attribute will scale ok?
>
>
>
> On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" <jt...@conversantmedia.com>
> wrote:
>
> Yes, that's the general design pattern. Another thing to look into is to
> compress the data. Now Kafka consumer/producer can already do it for you,
> but we choose to compress in the applications due to a historic issue that
> drgraded performance,  although it has been resolved now.
>
> Also,  just keep in mind that while you do your batching, kafka producer
> also tries to batch msgs to Kafka, and you will need to ensure you have
> enough buffer memory. However that's all configurable.
>
> Finally ensure you have the latest java updates and have kafka 0.10.2 or
> higher.
>
> Jayesh
>
>
> ------------------------------
>
> *From:* Matt Daum <ma...@setfive.com>
> *Sent:* Sunday, March 4, 2018 7:06:19 AM
> *To:* Thakrar, Jayesh
> *Cc:* users@kafka.apache.org
> *Subject:* Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> We actually don't have a kafka cluster setup yet at all.  Right now just
> have 8 of our application servers.  We currently sample some impressions
> and then dedupe/count outside at a different DC, but are looking to try to
> analyze all impressions for some overall analytics.
>
>
>
> Our requests are around 100-200 bytes each.  If we lost some of them due
> to network jitter etc. it would be fine we're trying to just get overall a
> rough count of each attribute.  Creating batched messages definitely makes
> sense and will also cut down on the network IO.
>
>
>
> We're trying to determine the required setup for Kafka to do what we're
> looking to do as these are physical servers so we'll most likely need to
> buy new hardware.  For the first run I think we'll try it out on one of our
> application clusters that get a smaller amount traffic (300-400k req/sec)
> and run the kafka cluster on the same machines as the applications.
>
>
>
> So would the best route here be something like each application server
> batches requests, send it to kafka, have a stream consumer that then
> tallies up the totals per attribute that we want to track, output that to a
> new topic, which then goes to a sink to either a DB or something like S3
> which then we read into our external DBs?
>
>
>
> Thanks!
>
>
>
> On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh <
> jthakrar@conversantmedia.com> wrote:
>
> Matt,
>
> If I understand correctly, you have an 8 node Kafka cluster and need to
> support  about 1 million requests/sec into the cluster from source servers
> and expect to consume that for aggregation.
>
> How big are your msgs?
>
> I would suggest looking into batching multiple requests per single Kafka
> msg to achieve desired throughput.
>
> So e.g. on the request receiving systems, I would suggest creating a
> logical avro file (byte buffer) of say N requests and then making that into
> one Kafka msg payload.
>
> We have a similar situation (https://www.slideshare.net/JayeshThakrar/
> apacheconflumekafka2016) and found anything from 4x to 10x better
> throughput with batching as compared to one request per msg.
> We have different kinds of msgs/topics and the individual "request" size
> varies from  about 100 bytes to 1+ KB.
>
>
> On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com> wrote:
>
>     I am new to Kafka but I think I have a good use case for it.  I am
> trying
>     to build daily counts of requests based on a number of different
> attributes
>     in a high throughput system (~1 million requests/sec. across all  8
>     servers).  The different attributes are unbounded in terms of values,
> and
>     some will spread across 100's of millions values.  This is my current
>     through process, let me know where I could be more efficient or if
> there is
>     a better way to do it.
>
>     I'll create an AVRO object "Impression" which has all the attributes
> of the
>     inbound request.  My application servers then will on each request
> create
>     and send this to a single kafka topic.
>
>     I'll then have a consumer which creates a stream from the topic.  From
>     there I'll use the windowed timeframes and groupBy to group by the
>     attributes on each given day.  At the end of the day I'd need to read
> out
>     the data store to an external system for storage.  Since I won't know
> all
>     the values I'd need something similar to the KVStore.all() but for
>     WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
>     commit:
>     https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f1
> 0ca76bd0c5
>     .
>
>     Is this the best approach to doing this?  Or would I be better using
> the
>     stream to listen and then an external DB like Aerospike to store the
> counts
>     and read out of it directly end of day.
>
>     Thanks for the help!
>     Daum
>
>
>
>
>
>
>

Re: Kafka Setup for Daily counts on wide array of keys

Posted by "Thakrar, Jayesh" <jt...@conversantmedia.com>.
Yep, exactly.

So there is some buffering that you need to do in your client and also deal with edge cases.
E.g. how long should you hold on to a batch before you send a smaller batch to producer since you want a balance between batch optimization and expedience.

You may need to do some experiments to balance between system throughput, record size, batch size and potential batching delay for a given rate of incoming requests.


From: Matt Daum <ma...@setfive.com>
Date: Monday, March 5, 2018 at 1:59 PM
To: "Thakrar, Jayesh" <jt...@conversantmedia.com>
Cc: "users@kafka.apache.org" <us...@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Ah good call, so you are really having an AVRO wrapper around your single class right?  IE an array of records, correct?  Then when you hit a size you are happy you send it to the producer?

On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
Good luck on your test!

As for the batching within Avro and by Kafka Producer, here are my thoughts without any empirical proof.
There is a certain amount of overhead in terms of execution AND bytes in converting a request record into Avro and producing (generating) a Kafka message out of it.
For requests of size 100-200 bytes, that can be a substantial amount - especially the fact that you will be bundling the Avro schema for each request in its Kafka message.

By batching the requests, you are significantly amortizing that overhead across many rows.

From: Matt Daum <ma...@setfive.com>>
Date: Monday, March 5, 2018 at 5:54 AM

To: "Thakrar, Jayesh" <jt...@conversantmedia.com>>
Cc: "users@kafka.apache.org<ma...@kafka.apache.org>" <us...@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks for the suggestions!  It does look like it's using local RocksDB stores for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example would be assume on each requests there is a parameters "X" which at the end of each day I want to know the counts per unique value, it could have 100's of millions of possible values.

I'll start to hopefully work this week on an initial test of everything and will report back.  A few last questions if you have the time:
- For the batching of the AVRO files, would this be different than the Producer batching?
- Any other things you'd suggest looking out for as gotcha's or configurations that probably will be good to tweak further?

Thanks!
Matt

On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
BTW - I did not mean to rule-out Aerospike as a possible datastore.
Its just that I am not familiar with it, but surely looks like a good candidate to store the raw and/or aggregated data, given that it also has a Kafka Connect module.

From: "Thakrar, Jayesh" <jt...@conversantmedia.com>>
Date: Sunday, March 4, 2018 at 9:25 PM
To: Matt Daum <ma...@setfive.com>>

Cc: "users@kafka.apache.org<ma...@kafka.apache.org>" <us...@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

I don’t have any experience/knowledge on the Kafka inbuilt datastore, but believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore completely (if appropriate) and store the raw data on HDFS organized by (say) date+hour
by using periodic (minute to hourly) extract jobs and store data in hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do compaction on data which leads to unnecessary reads and writes (referred to as write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure what you mean by them.
Is it that each record has some fields that represent different kinds of attributes and that their domain can have millions to hundreds of millions of values?
I don't think that should matter.

From: Matt Daum <ma...@setfive.com>>
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" <jt...@conversantmedia.com>>
Cc: "users@kafka.apache.org<ma...@kafka.apache.org>" <us...@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks! For the counts I'd need to use a global table to make sure it's across all the data right?   Also having millions of different values per grouped attribute will scale ok?

On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" <jt...@conversantmedia.com>> wrote:
Yes, that's the general design pattern. Another thing to look into is to compress the data. Now Kafka consumer/producer can already do it for you, but we choose to compress in the applications due to a historic issue that drgraded performance,  although it has been resolved now.
Also,  just keep in mind that while you do your batching, kafka producer also tries to batch msgs to Kafka, and you will need to ensure you have enough buffer memory. However that's all configurable.
Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.
Jayesh

________________________________
From: Matt Daum <ma...@setfive.com>>
Sent: Sunday, March 4, 2018 7:06:19 AM
To: Thakrar, Jayesh
Cc: users@kafka.apache.org<ma...@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

We actually don't have a kafka cluster setup yet at all.  Right now just have 8 of our application servers.  We currently sample some impressions and then dedupe/count outside at a different DC, but are looking to try to analyze all impressions for some overall analytics.

Our requests are around 100-200 bytes each.  If we lost some of them due to network jitter etc. it would be fine we're trying to just get overall a rough count of each attribute.  Creating batched messages definitely makes sense and will also cut down on the network IO.

We're trying to determine the required setup for Kafka to do what we're looking to do as these are physical servers so we'll most likely need to buy new hardware.  For the first run I think we'll try it out on one of our application clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka cluster on the same machines as the applications.

So would the best route here be something like each application server batches requests, send it to kafka, have a stream consumer that then tallies up the totals per attribute that we want to track, output that to a new topic, which then goes to a sink to either a DB or something like S3 which then we read into our external DBs?

Thanks!

On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support  about 1 million requests/sec into the cluster from source servers and expect to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical avro file (byte buffer) of say N requests and then making that into one Kafka msg payload.

We have a similar situation (https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found anything from 4x to 10x better throughput with batching as compared to one request per msg.
We have different kinds of msgs/topics and the individual "request" size varies from  about 100 bytes to 1+ KB.

On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com>> wrote:

    I am new to Kafka but I think I have a good use case for it.  I am trying
    to build daily counts of requests based on a number of different attributes
    in a high throughput system (~1 million requests/sec. across all  8
    servers).  The different attributes are unbounded in terms of values, and
    some will spread across 100's of millions values.  This is my current
    through process, let me know where I could be more efficient or if there is
    a better way to do it.

    I'll create an AVRO object "Impression" which has all the attributes of the
    inbound request.  My application servers then will on each request create
    and send this to a single kafka topic.

    I'll then have a consumer which creates a stream from the topic.  From
    there I'll use the windowed timeframes and groupBy to group by the
    attributes on each given day.  At the end of the day I'd need to read out
    the data store to an external system for storage.  Since I won't know all
    the values I'd need something similar to the KVStore.all() but for
    WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
    commit:
    https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
    .

    Is this the best approach to doing this?  Or would I be better using the
    stream to listen and then an external DB like Aerospike to store the counts
    and read out of it directly end of day.

    Thanks for the help!
    Daum




Re: Kafka Setup for Daily counts on wide array of keys

Posted by Matt Daum <ma...@setfive.com>.
Ah good call, so you are really having an AVRO wrapper around your single
class right?  IE an array of records, correct?  Then when you hit a size
you are happy you send it to the producer?

On Mon, Mar 5, 2018 at 12:07 PM, Thakrar, Jayesh <
jthakrar@conversantmedia.com> wrote:

> Good luck on your test!
>
>
>
> As for the batching within Avro and by Kafka Producer, here are my
> thoughts without any empirical proof.
>
> There is a certain amount of overhead in terms of execution AND bytes in
> converting a request record into Avro and producing (generating) a Kafka
> message out of it.
>
> For requests of size 100-200 bytes, that can be a substantial amount -
> especially the fact that you will be bundling the Avro schema for each
> request in its Kafka message.
>
>
>
> By batching the requests, you are significantly amortizing that overhead
> across many rows.
>
>
>
> *From: *Matt Daum <ma...@setfive.com>
> *Date: *Monday, March 5, 2018 at 5:54 AM
>
> *To: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
> *Cc: *"users@kafka.apache.org" <us...@kafka.apache.org>
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> Thanks for the suggestions!  It does look like it's using local RocksDB
> stores for the state info by default.  Will look into using an external
> one.
>
>
>
> As for the "millions of different values per grouped attribute" an example
> would be assume on each requests there is a parameters "X" which at the end
> of each day I want to know the counts per unique value, it could have 100's
> of millions of possible values.
>
>
>
> I'll start to hopefully work this week on an initial test of everything
> and will report back.  A few last questions if you have the time:
>
> - For the batching of the AVRO files, would this be different than the
> Producer batching?
>
> - Any other things you'd suggest looking out for as gotcha's or
> configurations that probably will be good to tweak further?
>
>
>
> Thanks!
>
> Matt
>
>
>
> On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh <
> jthakrar@conversantmedia.com> wrote:
>
> BTW - I did not mean to rule-out Aerospike as a possible datastore.
>
> Its just that I am not familiar with it, but surely looks like a good
> candidate to store the raw and/or aggregated data, given that it also has a
> Kafka Connect module.
>
>
>
> *From: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
> *Date: *Sunday, March 4, 2018 at 9:25 PM
> *To: *Matt Daum <ma...@setfive.com>
>
>
> *Cc: *"users@kafka.apache.org" <us...@kafka.apache.org>
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> I don’t have any experience/knowledge on the Kafka inbuilt datastore, but
> believe thatfor some
>
> portions of streaming Kafka uses (used?) RocksDB to locally store some
> state info in the brokers.
>
>
>
> Personally  I would use an external datastore.
>
> There's a wide choice out there - regular key-value stores like Cassandra,
> ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular
> RDBMSes.
>
> If you have hadoop in the picture, its even possible to bypass a datastore
> completely (if appropriate) and store the raw data on HDFS organized by
> (say) date+hour
>
> by using periodic (minute to hourly) extract jobs and store data in
> hive-compatible directory structure using ORC or Parquet.
>
>
>
> The reason for shying away from NoSQL datastores is their tendency to do
> compaction on data which leads to unnecessary reads and writes (referred to
> as write-amplification).
>
> With periodic jobs in Hadoop, you (usually) write your data once only.
> Ofcourse with that approach you loose the "random/keyed access" to the
> data,
>
> but if you are only interested in the aggregations across various
> dimensions, those can be stored in a SQL/NoSQL datastore.
>
>
>
> As for "having millions of different values per grouped attribute" - not
> sure what you mean by them.
>
> Is it that each record has some fields that represent different kinds of
> attributes and that their domain can have millions to hundreds of millions
> of values?
>
> I don't think that should matter.
>
>
>
> *From: *Matt Daum <ma...@setfive.com>
> *Date: *Sunday, March 4, 2018 at 2:39 PM
> *To: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
> *Cc: *"users@kafka.apache.org" <us...@kafka.apache.org>
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> Thanks! For the counts I'd need to use a global table to make sure it's
> across all the data right?   Also having millions of different values per
> grouped attribute will scale ok?
>
>
>
> On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" <jt...@conversantmedia.com>
> wrote:
>
> Yes, that's the general design pattern. Another thing to look into is to
> compress the data. Now Kafka consumer/producer can already do it for you,
> but we choose to compress in the applications due to a historic issue that
> drgraded performance,  although it has been resolved now.
>
> Also,  just keep in mind that while you do your batching, kafka producer
> also tries to batch msgs to Kafka, and you will need to ensure you have
> enough buffer memory. However that's all configurable.
>
> Finally ensure you have the latest java updates and have kafka 0.10.2 or
> higher.
>
> Jayesh
>
>
> ------------------------------
>
> *From:* Matt Daum <ma...@setfive.com>
> *Sent:* Sunday, March 4, 2018 7:06:19 AM
> *To:* Thakrar, Jayesh
> *Cc:* users@kafka.apache.org
> *Subject:* Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> We actually don't have a kafka cluster setup yet at all.  Right now just
> have 8 of our application servers.  We currently sample some impressions
> and then dedupe/count outside at a different DC, but are looking to try to
> analyze all impressions for some overall analytics.
>
>
>
> Our requests are around 100-200 bytes each.  If we lost some of them due
> to network jitter etc. it would be fine we're trying to just get overall a
> rough count of each attribute.  Creating batched messages definitely makes
> sense and will also cut down on the network IO.
>
>
>
> We're trying to determine the required setup for Kafka to do what we're
> looking to do as these are physical servers so we'll most likely need to
> buy new hardware.  For the first run I think we'll try it out on one of our
> application clusters that get a smaller amount traffic (300-400k req/sec)
> and run the kafka cluster on the same machines as the applications.
>
>
>
> So would the best route here be something like each application server
> batches requests, send it to kafka, have a stream consumer that then
> tallies up the totals per attribute that we want to track, output that to a
> new topic, which then goes to a sink to either a DB or something like S3
> which then we read into our external DBs?
>
>
>
> Thanks!
>
>
>
> On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh <
> jthakrar@conversantmedia.com> wrote:
>
> Matt,
>
> If I understand correctly, you have an 8 node Kafka cluster and need to
> support  about 1 million requests/sec into the cluster from source servers
> and expect to consume that for aggregation.
>
> How big are your msgs?
>
> I would suggest looking into batching multiple requests per single Kafka
> msg to achieve desired throughput.
>
> So e.g. on the request receiving systems, I would suggest creating a
> logical avro file (byte buffer) of say N requests and then making that into
> one Kafka msg payload.
>
> We have a similar situation (https://www.slideshare.net/JayeshThakrar/
> apacheconflumekafka2016) and found anything from 4x to 10x better
> throughput with batching as compared to one request per msg.
> We have different kinds of msgs/topics and the individual "request" size
> varies from  about 100 bytes to 1+ KB.
>
>
> On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com> wrote:
>
>     I am new to Kafka but I think I have a good use case for it.  I am
> trying
>     to build daily counts of requests based on a number of different
> attributes
>     in a high throughput system (~1 million requests/sec. across all  8
>     servers).  The different attributes are unbounded in terms of values,
> and
>     some will spread across 100's of millions values.  This is my current
>     through process, let me know where I could be more efficient or if
> there is
>     a better way to do it.
>
>     I'll create an AVRO object "Impression" which has all the attributes
> of the
>     inbound request.  My application servers then will on each request
> create
>     and send this to a single kafka topic.
>
>     I'll then have a consumer which creates a stream from the topic.  From
>     there I'll use the windowed timeframes and groupBy to group by the
>     attributes on each given day.  At the end of the day I'd need to read
> out
>     the data store to an external system for storage.  Since I won't know
> all
>     the values I'd need something similar to the KVStore.all() but for
>     WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
>     commit:
>     https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f1
> 0ca76bd0c5
>     .
>
>     Is this the best approach to doing this?  Or would I be better using
> the
>     stream to listen and then an external DB like Aerospike to store the
> counts
>     and read out of it directly end of day.
>
>     Thanks for the help!
>     Daum
>
>
>
>
>

Re: Kafka Setup for Daily counts on wide array of keys

Posted by "Thakrar, Jayesh" <jt...@conversantmedia.com>.
Good luck on your test!

As for the batching within Avro and by Kafka Producer, here are my thoughts without any empirical proof.
There is a certain amount of overhead in terms of execution AND bytes in converting a request record into Avro and producing (generating) a Kafka message out of it.
For requests of size 100-200 bytes, that can be a substantial amount - especially the fact that you will be bundling the Avro schema for each request in its Kafka message.

By batching the requests, you are significantly amortizing that overhead across many rows.

From: Matt Daum <ma...@setfive.com>
Date: Monday, March 5, 2018 at 5:54 AM
To: "Thakrar, Jayesh" <jt...@conversantmedia.com>
Cc: "users@kafka.apache.org" <us...@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks for the suggestions!  It does look like it's using local RocksDB stores for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example would be assume on each requests there is a parameters "X" which at the end of each day I want to know the counts per unique value, it could have 100's of millions of possible values.

I'll start to hopefully work this week on an initial test of everything and will report back.  A few last questions if you have the time:
- For the batching of the AVRO files, would this be different than the Producer batching?
- Any other things you'd suggest looking out for as gotcha's or configurations that probably will be good to tweak further?

Thanks!
Matt

On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
BTW - I did not mean to rule-out Aerospike as a possible datastore.
Its just that I am not familiar with it, but surely looks like a good candidate to store the raw and/or aggregated data, given that it also has a Kafka Connect module.

From: "Thakrar, Jayesh" <jt...@conversantmedia.com>>
Date: Sunday, March 4, 2018 at 9:25 PM
To: Matt Daum <ma...@setfive.com>>

Cc: "users@kafka.apache.org<ma...@kafka.apache.org>" <us...@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

I don’t have any experience/knowledge on the Kafka inbuilt datastore, but believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore completely (if appropriate) and store the raw data on HDFS organized by (say) date+hour
by using periodic (minute to hourly) extract jobs and store data in hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do compaction on data which leads to unnecessary reads and writes (referred to as write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure what you mean by them.
Is it that each record has some fields that represent different kinds of attributes and that their domain can have millions to hundreds of millions of values?
I don't think that should matter.

From: Matt Daum <ma...@setfive.com>>
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" <jt...@conversantmedia.com>>
Cc: "users@kafka.apache.org<ma...@kafka.apache.org>" <us...@kafka.apache.org>>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks! For the counts I'd need to use a global table to make sure it's across all the data right?   Also having millions of different values per grouped attribute will scale ok?

On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" <jt...@conversantmedia.com>> wrote:
Yes, that's the general design pattern. Another thing to look into is to compress the data. Now Kafka consumer/producer can already do it for you, but we choose to compress in the applications due to a historic issue that drgraded performance,  although it has been resolved now.
Also,  just keep in mind that while you do your batching, kafka producer also tries to batch msgs to Kafka, and you will need to ensure you have enough buffer memory. However that's all configurable.
Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.
Jayesh

________________________________
From: Matt Daum <ma...@setfive.com>>
Sent: Sunday, March 4, 2018 7:06:19 AM
To: Thakrar, Jayesh
Cc: users@kafka.apache.org<ma...@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

We actually don't have a kafka cluster setup yet at all.  Right now just have 8 of our application servers.  We currently sample some impressions and then dedupe/count outside at a different DC, but are looking to try to analyze all impressions for some overall analytics.

Our requests are around 100-200 bytes each.  If we lost some of them due to network jitter etc. it would be fine we're trying to just get overall a rough count of each attribute.  Creating batched messages definitely makes sense and will also cut down on the network IO.

We're trying to determine the required setup for Kafka to do what we're looking to do as these are physical servers so we'll most likely need to buy new hardware.  For the first run I think we'll try it out on one of our application clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka cluster on the same machines as the applications.

So would the best route here be something like each application server batches requests, send it to kafka, have a stream consumer that then tallies up the totals per attribute that we want to track, output that to a new topic, which then goes to a sink to either a DB or something like S3 which then we read into our external DBs?

Thanks!

On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support  about 1 million requests/sec into the cluster from source servers and expect to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical avro file (byte buffer) of say N requests and then making that into one Kafka msg payload.

We have a similar situation (https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found anything from 4x to 10x better throughput with batching as compared to one request per msg.
We have different kinds of msgs/topics and the individual "request" size varies from  about 100 bytes to 1+ KB.

On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com>> wrote:

    I am new to Kafka but I think I have a good use case for it.  I am trying
    to build daily counts of requests based on a number of different attributes
    in a high throughput system (~1 million requests/sec. across all  8
    servers).  The different attributes are unbounded in terms of values, and
    some will spread across 100's of millions values.  This is my current
    through process, let me know where I could be more efficient or if there is
    a better way to do it.

    I'll create an AVRO object "Impression" which has all the attributes of the
    inbound request.  My application servers then will on each request create
    and send this to a single kafka topic.

    I'll then have a consumer which creates a stream from the topic.  From
    there I'll use the windowed timeframes and groupBy to group by the
    attributes on each given day.  At the end of the day I'd need to read out
    the data store to an external system for storage.  Since I won't know all
    the values I'd need something similar to the KVStore.all() but for
    WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
    commit:
    https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
    .

    Is this the best approach to doing this?  Or would I be better using the
    stream to listen and then an external DB like Aerospike to store the counts
    and read out of it directly end of day.

    Thanks for the help!
    Daum




Re: Kafka Setup for Daily counts on wide array of keys

Posted by Matt Daum <ma...@setfive.com>.
Thanks for the suggestions!  It does look like it's using local RocksDB
stores for the state info by default.  Will look into using an external one.

As for the "millions of different values per grouped attribute" an example
would be assume on each requests there is a parameters "X" which at the end
of each day I want to know the counts per unique value, it could have 100's
of millions of possible values.

I'll start to hopefully work this week on an initial test of everything and
will report back.  A few last questions if you have the time:
- For the batching of the AVRO files, would this be different than the
Producer batching?
- Any other things you'd suggest looking out for as gotcha's or
configurations that probably will be good to tweak further?

Thanks!
Matt

On Sun, Mar 4, 2018 at 11:23 PM, Thakrar, Jayesh <
jthakrar@conversantmedia.com> wrote:

> BTW - I did not mean to rule-out Aerospike as a possible datastore.
>
> Its just that I am not familiar with it, but surely looks like a good
> candidate to store the raw and/or aggregated data, given that it also has a
> Kafka Connect module.
>
>
>
> *From: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
> *Date: *Sunday, March 4, 2018 at 9:25 PM
> *To: *Matt Daum <ma...@setfive.com>
>
> *Cc: *"users@kafka.apache.org" <us...@kafka.apache.org>
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> I don’t have any experience/knowledge on the Kafka inbuilt datastore, but
> believe thatfor some
>
> portions of streaming Kafka uses (used?) RocksDB to locally store some
> state info in the brokers.
>
>
>
> Personally  I would use an external datastore.
>
> There's a wide choice out there - regular key-value stores like Cassandra,
> ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular
> RDBMSes.
>
> If you have hadoop in the picture, its even possible to bypass a datastore
> completely (if appropriate) and store the raw data on HDFS organized by
> (say) date+hour
>
> by using periodic (minute to hourly) extract jobs and store data in
> hive-compatible directory structure using ORC or Parquet.
>
>
>
> The reason for shying away from NoSQL datastores is their tendency to do
> compaction on data which leads to unnecessary reads and writes (referred to
> as write-amplification).
>
> With periodic jobs in Hadoop, you (usually) write your data once only.
> Ofcourse with that approach you loose the "random/keyed access" to the
> data,
>
> but if you are only interested in the aggregations across various
> dimensions, those can be stored in a SQL/NoSQL datastore.
>
>
>
> As for "having millions of different values per grouped attribute" - not
> sure what you mean by them.
>
> Is it that each record has some fields that represent different kinds of
> attributes and that their domain can have millions to hundreds of millions
> of values?
>
> I don't think that should matter.
>
>
>
> *From: *Matt Daum <ma...@setfive.com>
> *Date: *Sunday, March 4, 2018 at 2:39 PM
> *To: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
> *Cc: *"users@kafka.apache.org" <us...@kafka.apache.org>
> *Subject: *Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> Thanks! For the counts I'd need to use a global table to make sure it's
> across all the data right?   Also having millions of different values per
> grouped attribute will scale ok?
>
>
>
> On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" <jt...@conversantmedia.com>
> wrote:
>
> Yes, that's the general design pattern. Another thing to look into is to
> compress the data. Now Kafka consumer/producer can already do it for you,
> but we choose to compress in the applications due to a historic issue that
> drgraded performance,  although it has been resolved now.
>
> Also,  just keep in mind that while you do your batching, kafka producer
> also tries to batch msgs to Kafka, and you will need to ensure you have
> enough buffer memory. However that's all configurable.
>
> Finally ensure you have the latest java updates and have kafka 0.10.2 or
> higher.
>
> Jayesh
>
>
> ------------------------------
>
> *From:* Matt Daum <ma...@setfive.com>
> *Sent:* Sunday, March 4, 2018 7:06:19 AM
> *To:* Thakrar, Jayesh
> *Cc:* users@kafka.apache.org
> *Subject:* Re: Kafka Setup for Daily counts on wide array of keys
>
>
>
> We actually don't have a kafka cluster setup yet at all.  Right now just
> have 8 of our application servers.  We currently sample some impressions
> and then dedupe/count outside at a different DC, but are looking to try to
> analyze all impressions for some overall analytics.
>
>
>
> Our requests are around 100-200 bytes each.  If we lost some of them due
> to network jitter etc. it would be fine we're trying to just get overall a
> rough count of each attribute.  Creating batched messages definitely makes
> sense and will also cut down on the network IO.
>
>
>
> We're trying to determine the required setup for Kafka to do what we're
> looking to do as these are physical servers so we'll most likely need to
> buy new hardware.  For the first run I think we'll try it out on one of our
> application clusters that get a smaller amount traffic (300-400k req/sec)
> and run the kafka cluster on the same machines as the applications.
>
>
>
> So would the best route here be something like each application server
> batches requests, send it to kafka, have a stream consumer that then
> tallies up the totals per attribute that we want to track, output that to a
> new topic, which then goes to a sink to either a DB or something like S3
> which then we read into our external DBs?
>
>
>
> Thanks!
>
>
>
> On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh <
> jthakrar@conversantmedia.com> wrote:
>
> Matt,
>
> If I understand correctly, you have an 8 node Kafka cluster and need to
> support  about 1 million requests/sec into the cluster from source servers
> and expect to consume that for aggregation.
>
> How big are your msgs?
>
> I would suggest looking into batching multiple requests per single Kafka
> msg to achieve desired throughput.
>
> So e.g. on the request receiving systems, I would suggest creating a
> logical avro file (byte buffer) of say N requests and then making that into
> one Kafka msg payload.
>
> We have a similar situation (https://www.slideshare.net/JayeshThakrar/
> apacheconflumekafka2016) and found anything from 4x to 10x better
> throughput with batching as compared to one request per msg.
> We have different kinds of msgs/topics and the individual "request" size
> varies from  about 100 bytes to 1+ KB.
>
>
> On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com> wrote:
>
>     I am new to Kafka but I think I have a good use case for it.  I am
> trying
>     to build daily counts of requests based on a number of different
> attributes
>     in a high throughput system (~1 million requests/sec. across all  8
>     servers).  The different attributes are unbounded in terms of values,
> and
>     some will spread across 100's of millions values.  This is my current
>     through process, let me know where I could be more efficient or if
> there is
>     a better way to do it.
>
>     I'll create an AVRO object "Impression" which has all the attributes
> of the
>     inbound request.  My application servers then will on each request
> create
>     and send this to a single kafka topic.
>
>     I'll then have a consumer which creates a stream from the topic.  From
>     there I'll use the windowed timeframes and groupBy to group by the
>     attributes on each given day.  At the end of the day I'd need to read
> out
>     the data store to an external system for storage.  Since I won't know
> all
>     the values I'd need something similar to the KVStore.all() but for
>     WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
>     commit:
>     https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f1
> 0ca76bd0c5
>     .
>
>     Is this the best approach to doing this?  Or would I be better using
> the
>     stream to listen and then an external DB like Aerospike to store the
> counts
>     and read out of it directly end of day.
>
>     Thanks for the help!
>     Daum
>
>
>
>
>

Re: Kafka Setup for Daily counts on wide array of keys

Posted by "Thakrar, Jayesh" <jt...@conversantmedia.com>.
BTW - I did not mean to rule-out Aerospike as a possible datastore.
Its just that I am not familiar with it, but surely looks like a good candidate to store the raw and/or aggregated data, given that it also has a Kafka Connect module.

From: "Thakrar, Jayesh" <jt...@conversantmedia.com>
Date: Sunday, March 4, 2018 at 9:25 PM
To: Matt Daum <ma...@setfive.com>
Cc: "users@kafka.apache.org" <us...@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

I don’t have any experience/knowledge on the Kafka inbuilt datastore, but believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore completely (if appropriate) and store the raw data on HDFS organized by (say) date+hour
by using periodic (minute to hourly) extract jobs and store data in hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do compaction on data which leads to unnecessary reads and writes (referred to as write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure what you mean by them.
Is it that each record has some fields that represent different kinds of attributes and that their domain can have millions to hundreds of millions of values?
I don't think that should matter.

From: Matt Daum <ma...@setfive.com>
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" <jt...@conversantmedia.com>
Cc: "users@kafka.apache.org" <us...@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks! For the counts I'd need to use a global table to make sure it's across all the data right?   Also having millions of different values per grouped attribute will scale ok?

On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" <jt...@conversantmedia.com>> wrote:
Yes, that's the general design pattern. Another thing to look into is to compress the data. Now Kafka consumer/producer can already do it for you, but we choose to compress in the applications due to a historic issue that drgraded performance,  although it has been resolved now.
Also,  just keep in mind that while you do your batching, kafka producer also tries to batch msgs to Kafka, and you will need to ensure you have enough buffer memory. However that's all configurable.
Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.
Jayesh

________________________________
From: Matt Daum <ma...@setfive.com>>
Sent: Sunday, March 4, 2018 7:06:19 AM
To: Thakrar, Jayesh
Cc: users@kafka.apache.org<ma...@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

We actually don't have a kafka cluster setup yet at all.  Right now just have 8 of our application servers.  We currently sample some impressions and then dedupe/count outside at a different DC, but are looking to try to analyze all impressions for some overall analytics.

Our requests are around 100-200 bytes each.  If we lost some of them due to network jitter etc. it would be fine we're trying to just get overall a rough count of each attribute.  Creating batched messages definitely makes sense and will also cut down on the network IO.

We're trying to determine the required setup for Kafka to do what we're looking to do as these are physical servers so we'll most likely need to buy new hardware.  For the first run I think we'll try it out on one of our application clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka cluster on the same machines as the applications.

So would the best route here be something like each application server batches requests, send it to kafka, have a stream consumer that then tallies up the totals per attribute that we want to track, output that to a new topic, which then goes to a sink to either a DB or something like S3 which then we read into our external DBs?

Thanks!

On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support  about 1 million requests/sec into the cluster from source servers and expect to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical avro file (byte buffer) of say N requests and then making that into one Kafka msg payload.

We have a similar situation (https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found anything from 4x to 10x better throughput with batching as compared to one request per msg.
We have different kinds of msgs/topics and the individual "request" size varies from  about 100 bytes to 1+ KB.

On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com>> wrote:

    I am new to Kafka but I think I have a good use case for it.  I am trying
    to build daily counts of requests based on a number of different attributes
    in a high throughput system (~1 million requests/sec. across all  8
    servers).  The different attributes are unbounded in terms of values, and
    some will spread across 100's of millions values.  This is my current
    through process, let me know where I could be more efficient or if there is
    a better way to do it.

    I'll create an AVRO object "Impression" which has all the attributes of the
    inbound request.  My application servers then will on each request create
    and send this to a single kafka topic.

    I'll then have a consumer which creates a stream from the topic.  From
    there I'll use the windowed timeframes and groupBy to group by the
    attributes on each given day.  At the end of the day I'd need to read out
    the data store to an external system for storage.  Since I won't know all
    the values I'd need something similar to the KVStore.all() but for
    WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
    commit:
    https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
    .

    Is this the best approach to doing this?  Or would I be better using the
    stream to listen and then an external DB like Aerospike to store the counts
    and read out of it directly end of day.

    Thanks for the help!
    Daum




Re: Kafka Setup for Daily counts on wide array of keys

Posted by "Thakrar, Jayesh" <jt...@conversantmedia.com>.
I don’t have any experience/knowledge on the Kafka inbuilt datastore, but believe thatfor some
portions of streaming Kafka uses (used?) RocksDB to locally store some state info in the brokers.

Personally  I would use an external datastore.
There's a wide choice out there - regular key-value stores like Cassandra, ScyllaDB, RocksDB, timeseries key-value stores like InfluxDB to regular RDBMSes.
If you have hadoop in the picture, its even possible to bypass a datastore completely (if appropriate) and store the raw data on HDFS organized by (say) date+hour
by using periodic (minute to hourly) extract jobs and store data in hive-compatible directory structure using ORC or Parquet.

The reason for shying away from NoSQL datastores is their tendency to do compaction on data which leads to unnecessary reads and writes (referred to as write-amplification).
With periodic jobs in Hadoop, you (usually) write your data once only. Ofcourse with that approach you loose the "random/keyed access" to the data,
but if you are only interested in the aggregations across various dimensions, those can be stored in a SQL/NoSQL datastore.

As for "having millions of different values per grouped attribute" - not sure what you mean by them.
Is it that each record has some fields that represent different kinds of attributes and that their domain can have millions to hundreds of millions of values?
I don't think that should matter.

From: Matt Daum <ma...@setfive.com>
Date: Sunday, March 4, 2018 at 2:39 PM
To: "Thakrar, Jayesh" <jt...@conversantmedia.com>
Cc: "users@kafka.apache.org" <us...@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

Thanks! For the counts I'd need to use a global table to make sure it's across all the data right?   Also having millions of different values per grouped attribute will scale ok?

On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" <jt...@conversantmedia.com>> wrote:
Yes, that's the general design pattern. Another thing to look into is to compress the data. Now Kafka consumer/producer can already do it for you, but we choose to compress in the applications due to a historic issue that drgraded performance,  although it has been resolved now.
Also,  just keep in mind that while you do your batching, kafka producer also tries to batch msgs to Kafka, and you will need to ensure you have enough buffer memory. However that's all configurable.
Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.
Jayesh

________________________________
From: Matt Daum <ma...@setfive.com>>
Sent: Sunday, March 4, 2018 7:06:19 AM
To: Thakrar, Jayesh
Cc: users@kafka.apache.org<ma...@kafka.apache.org>
Subject: Re: Kafka Setup for Daily counts on wide array of keys

We actually don't have a kafka cluster setup yet at all.  Right now just have 8 of our application servers.  We currently sample some impressions and then dedupe/count outside at a different DC, but are looking to try to analyze all impressions for some overall analytics.

Our requests are around 100-200 bytes each.  If we lost some of them due to network jitter etc. it would be fine we're trying to just get overall a rough count of each attribute.  Creating batched messages definitely makes sense and will also cut down on the network IO.

We're trying to determine the required setup for Kafka to do what we're looking to do as these are physical servers so we'll most likely need to buy new hardware.  For the first run I think we'll try it out on one of our application clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka cluster on the same machines as the applications.

So would the best route here be something like each application server batches requests, send it to kafka, have a stream consumer that then tallies up the totals per attribute that we want to track, output that to a new topic, which then goes to a sink to either a DB or something like S3 which then we read into our external DBs?

Thanks!

On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support  about 1 million requests/sec into the cluster from source servers and expect to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical avro file (byte buffer) of say N requests and then making that into one Kafka msg payload.

We have a similar situation (https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found anything from 4x to 10x better throughput with batching as compared to one request per msg.
We have different kinds of msgs/topics and the individual "request" size varies from  about 100 bytes to 1+ KB.

On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com>> wrote:

    I am new to Kafka but I think I have a good use case for it.  I am trying
    to build daily counts of requests based on a number of different attributes
    in a high throughput system (~1 million requests/sec. across all  8
    servers).  The different attributes are unbounded in terms of values, and
    some will spread across 100's of millions values.  This is my current
    through process, let me know where I could be more efficient or if there is
    a better way to do it.

    I'll create an AVRO object "Impression" which has all the attributes of the
    inbound request.  My application servers then will on each request create
    and send this to a single kafka topic.

    I'll then have a consumer which creates a stream from the topic.  From
    there I'll use the windowed timeframes and groupBy to group by the
    attributes on each given day.  At the end of the day I'd need to read out
    the data store to an external system for storage.  Since I won't know all
    the values I'd need something similar to the KVStore.all() but for
    WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
    commit:
    https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
    .

    Is this the best approach to doing this?  Or would I be better using the
    stream to listen and then an external DB like Aerospike to store the counts
    and read out of it directly end of day.

    Thanks for the help!
    Daum



Re: Kafka Setup for Daily counts on wide array of keys

Posted by Matt Daum <ma...@setfive.com>.
Thanks! For the counts I'd need to use a global table to make sure it's
across all the data right?   Also having millions of different values per
grouped attribute will scale ok?

On Mar 4, 2018 8:45 AM, "Thakrar, Jayesh" <jt...@conversantmedia.com>
wrote:

> Yes, that's the general design pattern. Another thing to look into is to
> compress the data. Now Kafka consumer/producer can already do it for you,
> but we choose to compress in the applications due to a historic issue that
> drgraded performance,  although it has been resolved now.
>
> Also,  just keep in mind that while you do your batching, kafka producer
> also tries to batch msgs to Kafka, and you will need to ensure you have
> enough buffer memory. However that's all configurable.
>
> Finally ensure you have the latest java updates and have kafka 0.10.2 or
> higher.
>
> Jayesh
>
> ------------------------------
> *From:* Matt Daum <ma...@setfive.com>
> *Sent:* Sunday, March 4, 2018 7:06:19 AM
> *To:* Thakrar, Jayesh
> *Cc:* users@kafka.apache.org
> *Subject:* Re: Kafka Setup for Daily counts on wide array of keys
>
> We actually don't have a kafka cluster setup yet at all.  Right now just
> have 8 of our application servers.  We currently sample some impressions
> and then dedupe/count outside at a different DC, but are looking to try to
> analyze all impressions for some overall analytics.
>
> Our requests are around 100-200 bytes each.  If we lost some of them due
> to network jitter etc. it would be fine we're trying to just get overall a
> rough count of each attribute.  Creating batched messages definitely makes
> sense and will also cut down on the network IO.
>
> We're trying to determine the required setup for Kafka to do what we're
> looking to do as these are physical servers so we'll most likely need to
> buy new hardware.  For the first run I think we'll try it out on one of our
> application clusters that get a smaller amount traffic (300-400k req/sec)
> and run the kafka cluster on the same machines as the applications.
>
> So would the best route here be something like each application server
> batches requests, send it to kafka, have a stream consumer that then
> tallies up the totals per attribute that we want to track, output that to a
> new topic, which then goes to a sink to either a DB or something like S3
> which then we read into our external DBs?
>
> Thanks!
>
> On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh <
> jthakrar@conversantmedia.com> wrote:
>
>> Matt,
>>
>> If I understand correctly, you have an 8 node Kafka cluster and need to
>> support  about 1 million requests/sec into the cluster from source servers
>> and expect to consume that for aggregation.
>>
>> How big are your msgs?
>>
>> I would suggest looking into batching multiple requests per single Kafka
>> msg to achieve desired throughput.
>>
>> So e.g. on the request receiving systems, I would suggest creating a
>> logical avro file (byte buffer) of say N requests and then making that into
>> one Kafka msg payload.
>>
>> We have a similar situation (https://www.slideshare.net/Ja
>> yeshThakrar/apacheconflumekafka2016) and found anything from 4x to 10x
>> better throughput with batching as compared to one request per msg.
>> We have different kinds of msgs/topics and the individual "request" size
>> varies from  about 100 bytes to 1+ KB.
>>
>> On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com> wrote:
>>
>>     I am new to Kafka but I think I have a good use case for it.  I am
>> trying
>>     to build daily counts of requests based on a number of different
>> attributes
>>     in a high throughput system (~1 million requests/sec. across all  8
>>     servers).  The different attributes are unbounded in terms of values,
>> and
>>     some will spread across 100's of millions values.  This is my current
>>     through process, let me know where I could be more efficient or if
>> there is
>>     a better way to do it.
>>
>>     I'll create an AVRO object "Impression" which has all the attributes
>> of the
>>     inbound request.  My application servers then will on each request
>> create
>>     and send this to a single kafka topic.
>>
>>     I'll then have a consumer which creates a stream from the topic.  From
>>     there I'll use the windowed timeframes and groupBy to group by the
>>     attributes on each given day.  At the end of the day I'd need to read
>> out
>>     the data store to an external system for storage.  Since I won't know
>> all
>>     the values I'd need something similar to the KVStore.all() but for
>>     WindowedKV Stores.  This appears that it'd be possible in 1.1 with
>> this
>>     commit:
>>     https://github.com/apache/kafka/commit/1d1c8575961bf6bce7dec
>> b049be7f10ca76bd0c5
>>     .
>>
>>     Is this the best approach to doing this?  Or would I be better using
>> the
>>     stream to listen and then an external DB like Aerospike to store the
>> counts
>>     and read out of it directly end of day.
>>
>>     Thanks for the help!
>>     Daum
>>
>>
>>
>

Re: Kafka Setup for Daily counts on wide array of keys

Posted by "Thakrar, Jayesh" <jt...@conversantmedia.com>.
Yes, that's the general design pattern. Another thing to look into is to compress the data. Now Kafka consumer/producer can already do it for you, but we choose to compress in the applications due to a historic issue that drgraded performance,  although it has been resolved now.

Also,  just keep in mind that while you do your batching, kafka producer also tries to batch msgs to Kafka, and you will need to ensure you have enough buffer memory. However that's all configurable.

Finally ensure you have the latest java updates and have kafka 0.10.2 or higher.

Jayesh

________________________________
From: Matt Daum <ma...@setfive.com>
Sent: Sunday, March 4, 2018 7:06:19 AM
To: Thakrar, Jayesh
Cc: users@kafka.apache.org
Subject: Re: Kafka Setup for Daily counts on wide array of keys

We actually don't have a kafka cluster setup yet at all.  Right now just have 8 of our application servers.  We currently sample some impressions and then dedupe/count outside at a different DC, but are looking to try to analyze all impressions for some overall analytics.

Our requests are around 100-200 bytes each.  If we lost some of them due to network jitter etc. it would be fine we're trying to just get overall a rough count of each attribute.  Creating batched messages definitely makes sense and will also cut down on the network IO.

We're trying to determine the required setup for Kafka to do what we're looking to do as these are physical servers so we'll most likely need to buy new hardware.  For the first run I think we'll try it out on one of our application clusters that get a smaller amount traffic (300-400k req/sec) and run the kafka cluster on the same machines as the applications.

So would the best route here be something like each application server batches requests, send it to kafka, have a stream consumer that then tallies up the totals per attribute that we want to track, output that to a new topic, which then goes to a sink to either a DB or something like S3 which then we read into our external DBs?

Thanks!

On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support  about 1 million requests/sec into the cluster from source servers and expect to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical avro file (byte buffer) of say N requests and then making that into one Kafka msg payload.

We have a similar situation (https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found anything from 4x to 10x better throughput with batching as compared to one request per msg.
We have different kinds of msgs/topics and the individual "request" size varies from  about 100 bytes to 1+ KB.

On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com>> wrote:

    I am new to Kafka but I think I have a good use case for it.  I am trying
    to build daily counts of requests based on a number of different attributes
    in a high throughput system (~1 million requests/sec. across all  8
    servers).  The different attributes are unbounded in terms of values, and
    some will spread across 100's of millions values.  This is my current
    through process, let me know where I could be more efficient or if there is
    a better way to do it.

    I'll create an AVRO object "Impression" which has all the attributes of the
    inbound request.  My application servers then will on each request create
    and send this to a single kafka topic.

    I'll then have a consumer which creates a stream from the topic.  From
    there I'll use the windowed timeframes and groupBy to group by the
    attributes on each given day.  At the end of the day I'd need to read out
    the data store to an external system for storage.  Since I won't know all
    the values I'd need something similar to the KVStore.all() but for
    WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
    commit:
    https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
    .

    Is this the best approach to doing this?  Or would I be better using the
    stream to listen and then an external DB like Aerospike to store the counts
    and read out of it directly end of day.

    Thanks for the help!
    Daum




Re: Kafka Setup for Daily counts on wide array of keys

Posted by Matt Daum <ma...@setfive.com>.
We actually don't have a kafka cluster setup yet at all.  Right now just
have 8 of our application servers.  We currently sample some impressions
and then dedupe/count outside at a different DC, but are looking to try to
analyze all impressions for some overall analytics.

Our requests are around 100-200 bytes each.  If we lost some of them due to
network jitter etc. it would be fine we're trying to just get overall a
rough count of each attribute.  Creating batched messages definitely makes
sense and will also cut down on the network IO.

We're trying to determine the required setup for Kafka to do what we're
looking to do as these are physical servers so we'll most likely need to
buy new hardware.  For the first run I think we'll try it out on one of our
application clusters that get a smaller amount traffic (300-400k req/sec)
and run the kafka cluster on the same machines as the applications.

So would the best route here be something like each application server
batches requests, send it to kafka, have a stream consumer that then
tallies up the totals per attribute that we want to track, output that to a
new topic, which then goes to a sink to either a DB or something like S3
which then we read into our external DBs?

Thanks!

On Sun, Mar 4, 2018 at 12:31 AM, Thakrar, Jayesh <
jthakrar@conversantmedia.com> wrote:

> Matt,
>
> If I understand correctly, you have an 8 node Kafka cluster and need to
> support  about 1 million requests/sec into the cluster from source servers
> and expect to consume that for aggregation.
>
> How big are your msgs?
>
> I would suggest looking into batching multiple requests per single Kafka
> msg to achieve desired throughput.
>
> So e.g. on the request receiving systems, I would suggest creating a
> logical avro file (byte buffer) of say N requests and then making that into
> one Kafka msg payload.
>
> We have a similar situation (https://www.slideshare.net/JayeshThakrar/
> apacheconflumekafka2016) and found anything from 4x to 10x better
> throughput with batching as compared to one request per msg.
> We have different kinds of msgs/topics and the individual "request" size
> varies from  about 100 bytes to 1+ KB.
>
> On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com> wrote:
>
>     I am new to Kafka but I think I have a good use case for it.  I am
> trying
>     to build daily counts of requests based on a number of different
> attributes
>     in a high throughput system (~1 million requests/sec. across all  8
>     servers).  The different attributes are unbounded in terms of values,
> and
>     some will spread across 100's of millions values.  This is my current
>     through process, let me know where I could be more efficient or if
> there is
>     a better way to do it.
>
>     I'll create an AVRO object "Impression" which has all the attributes
> of the
>     inbound request.  My application servers then will on each request
> create
>     and send this to a single kafka topic.
>
>     I'll then have a consumer which creates a stream from the topic.  From
>     there I'll use the windowed timeframes and groupBy to group by the
>     attributes on each given day.  At the end of the day I'd need to read
> out
>     the data store to an external system for storage.  Since I won't know
> all
>     the values I'd need something similar to the KVStore.all() but for
>     WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
>     commit:
>     https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f1
> 0ca76bd0c5
>     .
>
>     Is this the best approach to doing this?  Or would I be better using
> the
>     stream to listen and then an external DB like Aerospike to store the
> counts
>     and read out of it directly end of day.
>
>     Thanks for the help!
>     Daum
>
>
>

Re: Kafka Setup for Daily counts on wide array of keys

Posted by "Thakrar, Jayesh" <jt...@conversantmedia.com>.
Matt,

If I understand correctly, you have an 8 node Kafka cluster and need to support  about 1 million requests/sec into the cluster from source servers and expect to consume that for aggregation.

How big are your msgs?

I would suggest looking into batching multiple requests per single Kafka msg to achieve desired throughput.

So e.g. on the request receiving systems, I would suggest creating a logical avro file (byte buffer) of say N requests and then making that into one Kafka msg payload.

We have a similar situation (https://www.slideshare.net/JayeshThakrar/apacheconflumekafka2016) and found anything from 4x to 10x better throughput with batching as compared to one request per msg.
We have different kinds of msgs/topics and the individual "request" size varies from  about 100 bytes to 1+ KB. 

On 3/2/18, 8:24 AM, "Matt Daum" <ma...@setfive.com> wrote:

    I am new to Kafka but I think I have a good use case for it.  I am trying
    to build daily counts of requests based on a number of different attributes
    in a high throughput system (~1 million requests/sec. across all  8
    servers).  The different attributes are unbounded in terms of values, and
    some will spread across 100's of millions values.  This is my current
    through process, let me know where I could be more efficient or if there is
    a better way to do it.
    
    I'll create an AVRO object "Impression" which has all the attributes of the
    inbound request.  My application servers then will on each request create
    and send this to a single kafka topic.
    
    I'll then have a consumer which creates a stream from the topic.  From
    there I'll use the windowed timeframes and groupBy to group by the
    attributes on each given day.  At the end of the day I'd need to read out
    the data store to an external system for storage.  Since I won't know all
    the values I'd need something similar to the KVStore.all() but for
    WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
    commit:
    https://github.com/apache/kafka/commit/1d1c8575961bf6bce7decb049be7f10ca76bd0c5
    .
    
    Is this the best approach to doing this?  Or would I be better using the
    stream to listen and then an external DB like Aerospike to store the counts
    and read out of it directly end of day.
    
    Thanks for the help!
    Daum
    


Re: Kafka Setup for Daily counts on wide array of keys

Posted by Matt Daum <ma...@setfive.com>.
Actually it looks like the better way would be to output the counts to a
new topic then ingest that topic into the DB itself.  Is that the correct
way?

On Fri, Mar 2, 2018 at 9:24 AM, Matt Daum <ma...@setfive.com> wrote:

> I am new to Kafka but I think I have a good use case for it.  I am trying
> to build daily counts of requests based on a number of different attributes
> in a high throughput system (~1 million requests/sec. across all  8
> servers).  The different attributes are unbounded in terms of values, and
> some will spread across 100's of millions values.  This is my current
> through process, let me know where I could be more efficient or if there is
> a better way to do it.
>
> I'll create an AVRO object "Impression" which has all the attributes of
> the inbound request.  My application servers then will on each request
> create and send this to a single kafka topic.
>
> I'll then have a consumer which creates a stream from the topic.  From
> there I'll use the windowed timeframes and groupBy to group by the
> attributes on each given day.  At the end of the day I'd need to read out
> the data store to an external system for storage.  Since I won't know all
> the values I'd need something similar to the KVStore.all() but for
> WindowedKV Stores.  This appears that it'd be possible in 1.1 with this
> commit: https://github.com/apache/kafka/commit/
> 1d1c8575961bf6bce7decb049be7f10ca76bd0c5 .
>
> Is this the best approach to doing this?  Or would I be better using the
> stream to listen and then an external DB like Aerospike to store the counts
> and read out of it directly end of day.
>
> Thanks for the help!
> Daum
>