You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Edvard Fagerholm <ed...@gmail.com> on 2023/05/29 20:54:58 UTC

Patterns for generating ordered streams

Hi there,

Kafka is new to us, so we don't have operational experience from it. I'm
building a system that would be broadcasting events to groups of clients.
Kafka consumers handle the broadcasting. In other words,

kafka topic --- consumers --- clients

We need to maintain the order of the events as they come through Kafka and
clients need to be able to retrieve from a DB any events posted while they
were offline. This means that the consumer also writes each event to a DB.
The number of clients is in the hundreds of millions.

What I'm trying to understand here is what I can use to sort the events in
the DB. A natural candidate would be to include the Kafka partition offset
as a range key in the DB and when doing queries use it for sorting as it
would guarantee the same order. Timestamps from the consumer I would not
use, since they aren't idempotent and a consumer could fail before acking
its last batch.

A problem with the partition offset that I haven't been able to find an
answer to in the docs is operational. If we would ever like to move a topic
to a new cluster, then the partitions of the topic in the new cluster need
to start their offsets where they ended in the old cluster. Is it possible
to set initial offsets for each partition when creating a topic?

Have other users solved this in other ways? Another idea I've had is to
just keep a sequence number in memory on the consumer and then store offset
-> sequence number mappings in the DB. When committing the consumer would
delete the maps from older batches. This allows a new consumer to
synchronize with any previous consumer and maintain idempotency.

I'm assuming this is a topic that has concerned many Kafka users. What we
can't do is maintain a sequence number in a DB and update that on each
event. The reason is that we use Cassandra for ingesting the events and it
does not support a counter that can be incremented and read without
transactions, so it would be very expensive on every insert with the load
we have.

Best,
Edvard

Re: Patterns for generating ordered streams

Posted by Italo Nesi <it...@gmail.com>.
Hi Edward, if you use partition number and/or partition offset you are
potentially building up a technical debt and tightly coupling the pipeline.
What if you want to change the numbers of partitions in the future? Also
there is what you mentioned about moving to another Kafka cluster where you
have no control over setting the initial offset number. If you need to sink
data from Kafka to a DB, the most robust way is via a sink connector (if
you care about EOS). There are several available: PostgreSQL, Cassandra,
Mongodb, etc. As for the "queriability" on the DB side, I'd recommend
designing your system to use the data in the payload/event themselves. Or
to make it simple have it indexed by the event key and timestamp. I know
timestamp can have different meaning in Kafka, but as long as the same key
is coming from the same producer it should be fine to keep the order of the
events. In case not, I'd see if there is  way to have the timestamp as part
of the event value when produced in the first place, like for example IoT
devices that collect data every minute to only transmit at every hour
(timestamp must be part of the payload in such scenarios).

I hope I have helped. One last thing, are you self managing your kafka
clusters and taking care of its maintenance, upgrades, support, etc? Or are
you using a SaaS provider?

On Mon, 29 May 2023, 21:55 Edvard Fagerholm, <ed...@gmail.com>
wrote:

> Hi there,
>
> Kafka is new to us, so we don't have operational experience from it. I'm
> building a system that would be broadcasting events to groups of clients.
> Kafka consumers handle the broadcasting. In other words,
>
> kafka topic --- consumers --- clients
>
> We need to maintain the order of the events as they come through Kafka and
> clients need to be able to retrieve from a DB any events posted while they
> were offline. This means that the consumer also writes each event to a DB.
> The number of clients is in the hundreds of millions.
>
> What I'm trying to understand here is what I can use to sort the events in
> the DB. A natural candidate would be to include the Kafka partition offset
> as a range key in the DB and when doing queries use it for sorting as it
> would guarantee the same order. Timestamps from the consumer I would not
> use, since they aren't idempotent and a consumer could fail before acking
> its last batch.
>
> A problem with the partition offset that I haven't been able to find an
> answer to in the docs is operational. If we would ever like to move a topic
> to a new cluster, then the partitions of the topic in the new cluster need
> to start their offsets where they ended in the old cluster. Is it possible
> to set initial offsets for each partition when creating a topic?
>
> Have other users solved this in other ways? Another idea I've had is to
> just keep a sequence number in memory on the consumer and then store offset
> -> sequence number mappings in the DB. When committing the consumer would
> delete the maps from older batches. This allows a new consumer to
> synchronize with any previous consumer and maintain idempotency.
>
> I'm assuming this is a topic that has concerned many Kafka users. What we
> can't do is maintain a sequence number in a DB and update that on each
> event. The reason is that we use Cassandra for ingesting the events and it
> does not support a counter that can be incremented and read without
> transactions, so it would be very expensive on every insert with the load
> we have.
>
> Best,
> Edvard
>

Re: Patterns for generating ordered streams

Posted by "Brebner, Paul" <Pa...@netapp.com.INVALID>.
Hi Edvard, interesting problem – I’ve had similar problems with high fan out use cases, but only for demo applications where I’m more interested in scale than order – e.g. have a look at this list of blogs, examples include Anomalia Machina for Kafka+Cassandra, and Kongo, Kafka+IoT. https://www.linkedin.com/pulse/complete-guide-apache-kafka-developers-everything-i-know-paul-brebner/

Regards, Paul

From: Edvard Fagerholm <ed...@gmail.com>
Date: Tuesday, 30 May 2023 at 6:55 am
To: users@kafka.apache.org <us...@kafka.apache.org>
Subject: Patterns for generating ordered streams
NetApp Security WARNING: This is an external email. Do not click links or open attachments unless you recognize the sender and know the content is safe.




Hi there,

Kafka is new to us, so we don't have operational experience from it. I'm
building a system that would be broadcasting events to groups of clients.
Kafka consumers handle the broadcasting. In other words,

kafka topic --- consumers --- clients

We need to maintain the order of the events as they come through Kafka and
clients need to be able to retrieve from a DB any events posted while they
were offline. This means that the consumer also writes each event to a DB.
The number of clients is in the hundreds of millions.

What I'm trying to understand here is what I can use to sort the events in
the DB. A natural candidate would be to include the Kafka partition offset
as a range key in the DB and when doing queries use it for sorting as it
would guarantee the same order. Timestamps from the consumer I would not
use, since they aren't idempotent and a consumer could fail before acking
its last batch.

A problem with the partition offset that I haven't been able to find an
answer to in the docs is operational. If we would ever like to move a topic
to a new cluster, then the partitions of the topic in the new cluster need
to start their offsets where they ended in the old cluster. Is it possible
to set initial offsets for each partition when creating a topic?

Have other users solved this in other ways? Another idea I've had is to
just keep a sequence number in memory on the consumer and then store offset
-> sequence number mappings in the DB. When committing the consumer would
delete the maps from older batches. This allows a new consumer to
synchronize with any previous consumer and maintain idempotency.

I'm assuming this is a topic that has concerned many Kafka users. What we
can't do is maintain a sequence number in a DB and update that on each
event. The reason is that we use Cassandra for ingesting the events and it
does not support a counter that can be incremented and read without
transactions, so it would be very expensive on every insert with the load
we have.

Best,
Edvard

Re: Patterns for generating ordered streams

Posted by "Brebner, Paul" <Pa...@netapp.com.INVALID>.
Oh, the Kafka parallel consumer may help potentially? https://www.instaclustr.com/blog/improving-apache-kafka-performance-and-scalability-with-the-parallel-consumer-part-2/  Paul

From: Edvard Fagerholm <ed...@gmail.com>
Date: Tuesday, 30 May 2023 at 6:55 am
To: users@kafka.apache.org <us...@kafka.apache.org>
Subject: Patterns for generating ordered streams
NetApp Security WARNING: This is an external email. Do not click links or open attachments unless you recognize the sender and know the content is safe.




Hi there,

Kafka is new to us, so we don't have operational experience from it. I'm
building a system that would be broadcasting events to groups of clients.
Kafka consumers handle the broadcasting. In other words,

kafka topic --- consumers --- clients

We need to maintain the order of the events as they come through Kafka and
clients need to be able to retrieve from a DB any events posted while they
were offline. This means that the consumer also writes each event to a DB.
The number of clients is in the hundreds of millions.

What I'm trying to understand here is what I can use to sort the events in
the DB. A natural candidate would be to include the Kafka partition offset
as a range key in the DB and when doing queries use it for sorting as it
would guarantee the same order. Timestamps from the consumer I would not
use, since they aren't idempotent and a consumer could fail before acking
its last batch.

A problem with the partition offset that I haven't been able to find an
answer to in the docs is operational. If we would ever like to move a topic
to a new cluster, then the partitions of the topic in the new cluster need
to start their offsets where they ended in the old cluster. Is it possible
to set initial offsets for each partition when creating a topic?

Have other users solved this in other ways? Another idea I've had is to
just keep a sequence number in memory on the consumer and then store offset
-> sequence number mappings in the DB. When committing the consumer would
delete the maps from older batches. This allows a new consumer to
synchronize with any previous consumer and maintain idempotency.

I'm assuming this is a topic that has concerned many Kafka users. What we
can't do is maintain a sequence number in a DB and update that on each
event. The reason is that we use Cassandra for ingesting the events and it
does not support a counter that can be incremented and read without
transactions, so it would be very expensive on every insert with the load
we have.

Best,
Edvard