You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@couchdb.apache.org by Adam Kocoloski <ko...@apache.org> on 2019/03/20 22:47:42 UTC

[DISCUSS] _db_updates feed in FoundationDB

Hi all,

Most of the discussions so far have focused on the core features that are fundamental to CouchDB: JSON documents, revision tracking, _changes. I thought I’d start a thread on something a bit different: the _db_updates feed.

The _db_updates feed is an API that enables users to discover database lifecycle events across an entire CouchDB instance. It’s primarily useful in deployments that have lots and lots of databases, where it’s impractical to keep connections open for every database, and where database creations and deletions may be an automated aspect of the application’s use of CouchDB.

There are really two topics for discussion here. The first is: do we need to keep it? The primary driver of applications creating lots of DBs is the per-DB granularity of access controls; if we go down the route of implementing the document-level _access proposal perhaps users naturally migrate away from this DB-per-user data model. I’d be curious to hear points of view there.

I’ll assume for now that we do want to keep it, and offer some thoughts on how to implement it. The main challenge with _db_updates is managing the write contention; in write-heavy databases you have a lot of producers trying to tag that particular database as “updated", but all the consumer really cares about is getting a single “dbname”:”updated” event as needed. In the current architecture we try to dedupe a lot of the events in-memory before updating a regular CouchDB database with this information, but this leaves us exposed to possibly dropping events within a few second window.

## Option 1: Queue + Compaction

One way to tackle this in FoundationDB is to have an intermediate subspace reserved as a queue. Each transaction that modifies a database would insert a versionstamped KV into the queue like

Versionstamp = (DbName, EventType)

Versionstamps are monotonically increasing and inserting versionstamped keys is a conflict-free operation. We’d have a consumer of this queue which is responsible for “log compaction”; i.e., the consumer would do range reads on the queue subspace, toss out duplicate contiguous “dbname”:“updated” events, and update a second index which would look more like the _changes feed.

### Scaling Consumers

A single consumer can likely process 10k events/sec or more, but eventually we’ll need to scale. Borrowing from systems like Kafka the typical way to do this is to divide the queue into partitions and have individual consumers mapped to each partition. A partition in this model would just be a prefix on the Versionstamp:

(PartitionID, Versionstamp) = (DbName, EventType)

Our consumers will be more efficient and less likely to conflict with one another on updating the _db_updates index if messages are keyed to a partition based on DbName, although this still runs the risk that a couple of high-throughput databases could swamp a partition.

I’m not sure about the best path forward for handling that scenario. One could implement a rate-limiter that starts sloughing off additional messages for high-throughput databases (which has some careful edge cases), split the messages for a single database across multiple partitions, rely on operators to blacklist certain databases from the _db_updates system, etc. Each has downsides.

## Option 2: Atomic Ops + Consumer

In this approach we still have an intermediate subspace, and a consumer of that subspace which updates the _db_updates index. But this time, we have at most one KV per database in the subspace, with an atomic counter for a value. When a document is updated it bumps the counter for its database in that subspace. So we’ll have entries like

(“counters”, “db1235”) = 1
(“counters”, “db0001”) = 42
(“counters”, “telemetry-db”) = 12312

and so on. Like versionstamps, atomic operations are conflict-free so we need not worry about introducing spurious conflicts on high-throughput databases.

The initial pass of the consumer logic would go something like this:

- Do a snapshot range read of the “counters” subspace (or whatever we call it)
- Record the current values for all counters in a separate summary KV (you’ll see why in a minute)
- Do a limit=1 range read on the _changes space for each DB in the list to grab the latest Sequence
- Update the _db_updates index with the latest Sequence for each of these databases

On a second pass, the consumer would read the summary KV from the last pass and compare the previous counters with the current values. If any counters have not been updated in the interval, the consumer would try to clear those from the “counters” subspace (adding them as explicit conflict keys to ensure we don’t miss a concurrent update). It would then proceed with the rest of the logic from the initial pass. This is a careful balancing act:

- We don’t want to pollute the “counters” subspace with idle databases because each entry requires an extra read of _changes
- We don’t want to attempt to clear counters that are constantly updated because that’s going to fail with a conflict every time

The scalability axis here is the number of databases updated within any short window of time (~1 second or less). If we end up with that number growing large we can have consumers responsible for range of the “counters” subspace, though I think that’s less likely than in the queue-based design.

I don’t know in detail what optimizations FoundationDB applies to atomic operations (e.g. coalescing them at a layer above the storage engine). That’s worth checking into, as otherwise I’d be concerned about introducing super-hot keys here.

This option does not handle the “created” and “deleted” lifecycle events for each database, but those are really quite simple and could really be inserted directly into the _db_updates index.

===

There are some additional details which can be fleshed out in an RFC, but this is the basic gist of things. Both designs would be more robust at capturing every single updated database (because the enqueue/increment operation would be part of the document update transaction). They would allow for a small delay between the document update and the appearance of the database in _db_updates, which is no different than we have today. They each require a background process.

Let’s hear what you think, both about the interest level for this feature and any comments on the designs. I may take this one over to the FDB forums as well for feedback. Cheers,

Adam

Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Steven Le Roux <le...@gmail.com>.
>> If you map a consumer with a partition, you will really suffer with
operational issues.

> I’m curious to hear a little more about this one. Are you simply saying
that decoupling partitions and consumers has a bunch of Ops benefits, or
are you saying that any time a consumer group with N consumers is reading
from a topic with N partitions that's asking for trouble?

Actually, what you're looking for is how much groups you will need. The
real dynamic aspect of consumption here is binding a subscriber to a
consumer group so that each subscriber will see the same update feed. It's
not a ideal design for multi tenancy though.

Partitions in the Kafka way to see them is not something very light or
logical since they define physical directories where files are written. You
can have hundreds of them, maybe few thousands, but they come with
drawbacks :
 - if you want a customer per partition, you need the producer to push the
data to all partitions so that all consumers will the the same
 - partitions number can be resized, but carefully since it implies
capacity planning and disk space availability given the data distribution.
 - resizing is a sensible operation, if you add partitions, just ensure
disk availability on brokers that will welcome new ones, but if you size
them down, you may encounter issues with ordering of your transactions.
 - partitions are filled by producers based on a partitionner. If you don't
achieve a good distribution among partitions, you may encounter situations
where a disk is full when others are empty.

To me partitions are an internal aspect to Kafka to make it scale
horizontaly, and it can be related to the consumper scalability you're
looking for. Also a big difference is how you may want to expose an update
feed (subscribing from now, or catching up to a previous state).

For many of the above aspects, even if Kafka remains a great piece of art,
Pulsar is doing a better job as a distributed log. Based on Bookkeeper,
data are split into segments that are more fine grained than Partitions and
easier to manage/move etc. Still they're not more usefull for consumers and
Pulsar have Subscriptions where Kafka have Consumer Groups. Consumers are
just a way to scale the consumption ( read more here
http://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#subscription-modes
)

I hope it helps :)


On Mon, Mar 25, 2019 at 7:41 PM Adam Kocoloski <ko...@apache.org> wrote:

> Hi Steven, yep, thanks for adding that. When I wrote this I was
> double-checking the behavior of consumer groups to see if it was ever
> useful to have more consumers in a consumer group than partitions in the
> associated topic. Seems like the answer is no — partitions need to be
> allocated with an eye towards consumer throughput, not just producer
> throughput.
>
> > If you map a consumer with a partition, you will really suffer with
> operational issues.
>
> I’m curious to hear a little more about this one. Are you simply saying
> that decoupling partitions and consumers has a bunch of Ops benefits, or
> are you saying that any time a consumer group with N consumers is reading
> from a topic with N partitions that's asking for trouble?
>
> Pulsar / Kafka as a tool for change feed storage is a really interesting
> conversation. I’ve previously taken a close look at both of them for that
> use case and I do think there’s a lot of potential. I know Pulsar in
> particular was designed to solve that sort of problem at scale. I had been
> presuming that this would be a “both/and” kind of thing; i.e., every
> CouchDB would still have a built-in _changes feed, but via Pulsar I/O or
> Kafka Connect you could easily wire that feed into the message bus for
> improved distribution, a broader ecosystem, better management, etc. When
> you start talking about replacing the internal feed altogether there are a
> number of complications:
>
> - Transactional updates across both FoundationDB and Pulsar/Kafka is a
> hard problem
> - Documents showing up "exactly once” is possible with topic compaction,
> but not transactional
> - Internal consumers of the change feed, e.g. the secondary view system
> would get more complicated
>
> I like that I’m not the only one thinking these things go hand-in-hand,
> though :)
>
> Adam
>
> > On Mar 25, 2019, at 1:06 PM, Steven Le Roux <le...@gmail.com>
> wrote:
> >
> > Hi Adam,
> >
> > Regarding the consumer scaling point, I just want to mention that
> > Partitions are used in Kafka (or Pulsar) to scale a topic but not
> directly
> > consumers. If you map a consumer with a partition, you will really suffer
> > with operational issues.
> >
> > Consumers are managed in "consumer groups" that isolate them which is a
> > handy way to choose between a shared message queue to a pub/sub model. If
> > you need consumers to read the same data, then you need them to have a
> > different consumer group id. They're used to track distribution accross
> > consumers by assigning partitions to consumers. Only one consumer in a
> > consumer group can subscribe a given partition.
> >
> > While going distributed, I'd find the choice of relying on Pulsar or
> > Bookkeeper or Kafka as the distributed Log very interesting.
> >
> > I would find very smart to provide two version of CouchDB :
> > - a classic standalone/master/slave
> > - a distributed with FoundationDB layer for KV and a distributed Log
> > option for the feeds/changes.
> >
> >
> > On Wed, Mar 20, 2019 at 11:47 PM Adam Kocoloski <ko...@apache.org>
> wrote:
> >
> >> Hi all,
> >>
> >> Most of the discussions so far have focused on the core features that
> are
> >> fundamental to CouchDB: JSON documents, revision tracking, _changes. I
> >> thought I’d start a thread on something a bit different: the _db_updates
> >> feed.
> >>
> >> The _db_updates feed is an API that enables users to discover database
> >> lifecycle events across an entire CouchDB instance. It’s primarily
> useful
> >> in deployments that have lots and lots of databases, where it’s
> impractical
> >> to keep connections open for every database, and where database
> creations
> >> and deletions may be an automated aspect of the application’s use of
> >> CouchDB.
> >>
> >> There are really two topics for discussion here. The first is: do we
> need
> >> to keep it? The primary driver of applications creating lots of DBs is
> the
> >> per-DB granularity of access controls; if we go down the route of
> >> implementing the document-level _access proposal perhaps users naturally
> >> migrate away from this DB-per-user data model. I’d be curious to hear
> >> points of view there.
> >>
> >> I’ll assume for now that we do want to keep it, and offer some thoughts
> on
> >> how to implement it. The main challenge with _db_updates is managing the
> >> write contention; in write-heavy databases you have a lot of producers
> >> trying to tag that particular database as “updated", but all the
> consumer
> >> really cares about is getting a single “dbname”:”updated” event as
> needed.
> >> In the current architecture we try to dedupe a lot of the events
> in-memory
> >> before updating a regular CouchDB database with this information, but
> this
> >> leaves us exposed to possibly dropping events within a few second
> window.
> >>
> >> ## Option 1: Queue + Compaction
> >>
> >> One way to tackle this in FoundationDB is to have an intermediate
> subspace
> >> reserved as a queue. Each transaction that modifies a database would
> insert
> >> a versionstamped KV into the queue like
> >>
> >> Versionstamp = (DbName, EventType)
> >>
> >> Versionstamps are monotonically increasing and inserting versionstamped
> >> keys is a conflict-free operation. We’d have a consumer of this queue
> which
> >> is responsible for “log compaction”; i.e., the consumer would do range
> >> reads on the queue subspace, toss out duplicate contiguous
> >> “dbname”:“updated” events, and update a second index which would look
> more
> >> like the _changes feed.
> >>
> >> ### Scaling Consumers
> >>
> >> A single consumer can likely process 10k events/sec or more, but
> >> eventually we’ll need to scale. Borrowing from systems like Kafka the
> >> typical way to do this is to divide the queue into partitions and have
> >> individual consumers mapped to each partition. A partition in this model
> >> would just be a prefix on the Versionstamp:
> >>
> >> (PartitionID, Versionstamp) = (DbName, EventType)
> >>
> >> Our consumers will be more efficient and less likely to conflict with
> one
> >> another on updating the _db_updates index if messages are keyed to a
> >> partition based on DbName, although this still runs the risk that a
> couple
> >> of high-throughput databases could swamp a partition.
> >>
> >> I’m not sure about the best path forward for handling that scenario. One
> >> could implement a rate-limiter that starts sloughing off additional
> >> messages for high-throughput databases (which has some careful edge
> cases),
> >> split the messages for a single database across multiple partitions,
> rely
> >> on operators to blacklist certain databases from the _db_updates system,
> >> etc. Each has downsides.
> >>
> >> ## Option 2: Atomic Ops + Consumer
> >>
> >> In this approach we still have an intermediate subspace, and a consumer
> of
> >> that subspace which updates the _db_updates index. But this time, we
> have
> >> at most one KV per database in the subspace, with an atomic counter for
> a
> >> value. When a document is updated it bumps the counter for its database
> in
> >> that subspace. So we’ll have entries like
> >>
> >> (“counters”, “db1235”) = 1
> >> (“counters”, “db0001”) = 42
> >> (“counters”, “telemetry-db”) = 12312
> >>
> >> and so on. Like versionstamps, atomic operations are conflict-free so we
> >> need not worry about introducing spurious conflicts on high-throughput
> >> databases.
> >>
> >> The initial pass of the consumer logic would go something like this:
> >>
> >> - Do a snapshot range read of the “counters” subspace (or whatever we
> call
> >> it)
> >> - Record the current values for all counters in a separate summary KV
> >> (you’ll see why in a minute)
> >> - Do a limit=1 range read on the _changes space for each DB in the list
> to
> >> grab the latest Sequence
> >> - Update the _db_updates index with the latest Sequence for each of
> these
> >> databases
> >>
> >> On a second pass, the consumer would read the summary KV from the last
> >> pass and compare the previous counters with the current values. If any
> >> counters have not been updated in the interval, the consumer would try
> to
> >> clear those from the “counters” subspace (adding them as explicit
> conflict
> >> keys to ensure we don’t miss a concurrent update). It would then proceed
> >> with the rest of the logic from the initial pass. This is a careful
> >> balancing act:
> >>
> >> - We don’t want to pollute the “counters” subspace with idle databases
> >> because each entry requires an extra read of _changes
> >> - We don’t want to attempt to clear counters that are constantly updated
> >> because that’s going to fail with a conflict every time
> >>
> >> The scalability axis here is the number of databases updated within any
> >> short window of time (~1 second or less). If we end up with that number
> >> growing large we can have consumers responsible for range of the
> “counters”
> >> subspace, though I think that’s less likely than in the queue-based
> design.
> >>
> >> I don’t know in detail what optimizations FoundationDB applies to atomic
> >> operations (e.g. coalescing them at a layer above the storage engine).
> >> That’s worth checking into, as otherwise I’d be concerned about
> introducing
> >> super-hot keys here.
> >>
> >> This option does not handle the “created” and “deleted” lifecycle events
> >> for each database, but those are really quite simple and could really be
> >> inserted directly into the _db_updates index.
> >>
> >> ===
> >>
> >> There are some additional details which can be fleshed out in an RFC,
> but
> >> this is the basic gist of things. Both designs would be more robust at
> >> capturing every single updated database (because the enqueue/increment
> >> operation would be part of the document update transaction). They would
> >> allow for a small delay between the document update and the appearance
> of
> >> the database in _db_updates, which is no different than we have today.
> They
> >> each require a background process.
> >>
> >> Let’s hear what you think, both about the interest level for this
> feature
> >> and any comments on the designs. I may take this one over to the FDB
> forums
> >> as well for feedback. Cheers,
> >>
> >> Adam
>
>

Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Adam Kocoloski <ko...@apache.org>.
Hi Steven, yep, thanks for adding that. When I wrote this I was double-checking the behavior of consumer groups to see if it was ever useful to have more consumers in a consumer group than partitions in the associated topic. Seems like the answer is no — partitions need to be allocated with an eye towards consumer throughput, not just producer throughput.

> If you map a consumer with a partition, you will really suffer with operational issues.

I’m curious to hear a little more about this one. Are you simply saying that decoupling partitions and consumers has a bunch of Ops benefits, or are you saying that any time a consumer group with N consumers is reading from a topic with N partitions that's asking for trouble?

Pulsar / Kafka as a tool for change feed storage is a really interesting conversation. I’ve previously taken a close look at both of them for that use case and I do think there’s a lot of potential. I know Pulsar in particular was designed to solve that sort of problem at scale. I had been presuming that this would be a “both/and” kind of thing; i.e., every CouchDB would still have a built-in _changes feed, but via Pulsar I/O or Kafka Connect you could easily wire that feed into the message bus for improved distribution, a broader ecosystem, better management, etc. When you start talking about replacing the internal feed altogether there are a number of complications:

- Transactional updates across both FoundationDB and Pulsar/Kafka is a hard problem
- Documents showing up "exactly once” is possible with topic compaction, but not transactional
- Internal consumers of the change feed, e.g. the secondary view system would get more complicated

I like that I’m not the only one thinking these things go hand-in-hand, though :)

Adam

> On Mar 25, 2019, at 1:06 PM, Steven Le Roux <le...@gmail.com> wrote:
> 
> Hi Adam,
> 
> Regarding the consumer scaling point, I just want to mention that
> Partitions are used in Kafka (or Pulsar) to scale a topic but not directly
> consumers. If you map a consumer with a partition, you will really suffer
> with operational issues.
> 
> Consumers are managed in "consumer groups" that isolate them which is a
> handy way to choose between a shared message queue to a pub/sub model. If
> you need consumers to read the same data, then you need them to have a
> different consumer group id. They're used to track distribution accross
> consumers by assigning partitions to consumers. Only one consumer in a
> consumer group can subscribe a given partition.
> 
> While going distributed, I'd find the choice of relying on Pulsar or
> Bookkeeper or Kafka as the distributed Log very interesting.
> 
> I would find very smart to provide two version of CouchDB :
> - a classic standalone/master/slave
> - a distributed with FoundationDB layer for KV and a distributed Log
> option for the feeds/changes.
> 
> 
> On Wed, Mar 20, 2019 at 11:47 PM Adam Kocoloski <ko...@apache.org> wrote:
> 
>> Hi all,
>> 
>> Most of the discussions so far have focused on the core features that are
>> fundamental to CouchDB: JSON documents, revision tracking, _changes. I
>> thought I’d start a thread on something a bit different: the _db_updates
>> feed.
>> 
>> The _db_updates feed is an API that enables users to discover database
>> lifecycle events across an entire CouchDB instance. It’s primarily useful
>> in deployments that have lots and lots of databases, where it’s impractical
>> to keep connections open for every database, and where database creations
>> and deletions may be an automated aspect of the application’s use of
>> CouchDB.
>> 
>> There are really two topics for discussion here. The first is: do we need
>> to keep it? The primary driver of applications creating lots of DBs is the
>> per-DB granularity of access controls; if we go down the route of
>> implementing the document-level _access proposal perhaps users naturally
>> migrate away from this DB-per-user data model. I’d be curious to hear
>> points of view there.
>> 
>> I’ll assume for now that we do want to keep it, and offer some thoughts on
>> how to implement it. The main challenge with _db_updates is managing the
>> write contention; in write-heavy databases you have a lot of producers
>> trying to tag that particular database as “updated", but all the consumer
>> really cares about is getting a single “dbname”:”updated” event as needed.
>> In the current architecture we try to dedupe a lot of the events in-memory
>> before updating a regular CouchDB database with this information, but this
>> leaves us exposed to possibly dropping events within a few second window.
>> 
>> ## Option 1: Queue + Compaction
>> 
>> One way to tackle this in FoundationDB is to have an intermediate subspace
>> reserved as a queue. Each transaction that modifies a database would insert
>> a versionstamped KV into the queue like
>> 
>> Versionstamp = (DbName, EventType)
>> 
>> Versionstamps are monotonically increasing and inserting versionstamped
>> keys is a conflict-free operation. We’d have a consumer of this queue which
>> is responsible for “log compaction”; i.e., the consumer would do range
>> reads on the queue subspace, toss out duplicate contiguous
>> “dbname”:“updated” events, and update a second index which would look more
>> like the _changes feed.
>> 
>> ### Scaling Consumers
>> 
>> A single consumer can likely process 10k events/sec or more, but
>> eventually we’ll need to scale. Borrowing from systems like Kafka the
>> typical way to do this is to divide the queue into partitions and have
>> individual consumers mapped to each partition. A partition in this model
>> would just be a prefix on the Versionstamp:
>> 
>> (PartitionID, Versionstamp) = (DbName, EventType)
>> 
>> Our consumers will be more efficient and less likely to conflict with one
>> another on updating the _db_updates index if messages are keyed to a
>> partition based on DbName, although this still runs the risk that a couple
>> of high-throughput databases could swamp a partition.
>> 
>> I’m not sure about the best path forward for handling that scenario. One
>> could implement a rate-limiter that starts sloughing off additional
>> messages for high-throughput databases (which has some careful edge cases),
>> split the messages for a single database across multiple partitions, rely
>> on operators to blacklist certain databases from the _db_updates system,
>> etc. Each has downsides.
>> 
>> ## Option 2: Atomic Ops + Consumer
>> 
>> In this approach we still have an intermediate subspace, and a consumer of
>> that subspace which updates the _db_updates index. But this time, we have
>> at most one KV per database in the subspace, with an atomic counter for a
>> value. When a document is updated it bumps the counter for its database in
>> that subspace. So we’ll have entries like
>> 
>> (“counters”, “db1235”) = 1
>> (“counters”, “db0001”) = 42
>> (“counters”, “telemetry-db”) = 12312
>> 
>> and so on. Like versionstamps, atomic operations are conflict-free so we
>> need not worry about introducing spurious conflicts on high-throughput
>> databases.
>> 
>> The initial pass of the consumer logic would go something like this:
>> 
>> - Do a snapshot range read of the “counters” subspace (or whatever we call
>> it)
>> - Record the current values for all counters in a separate summary KV
>> (you’ll see why in a minute)
>> - Do a limit=1 range read on the _changes space for each DB in the list to
>> grab the latest Sequence
>> - Update the _db_updates index with the latest Sequence for each of these
>> databases
>> 
>> On a second pass, the consumer would read the summary KV from the last
>> pass and compare the previous counters with the current values. If any
>> counters have not been updated in the interval, the consumer would try to
>> clear those from the “counters” subspace (adding them as explicit conflict
>> keys to ensure we don’t miss a concurrent update). It would then proceed
>> with the rest of the logic from the initial pass. This is a careful
>> balancing act:
>> 
>> - We don’t want to pollute the “counters” subspace with idle databases
>> because each entry requires an extra read of _changes
>> - We don’t want to attempt to clear counters that are constantly updated
>> because that’s going to fail with a conflict every time
>> 
>> The scalability axis here is the number of databases updated within any
>> short window of time (~1 second or less). If we end up with that number
>> growing large we can have consumers responsible for range of the “counters”
>> subspace, though I think that’s less likely than in the queue-based design.
>> 
>> I don’t know in detail what optimizations FoundationDB applies to atomic
>> operations (e.g. coalescing them at a layer above the storage engine).
>> That’s worth checking into, as otherwise I’d be concerned about introducing
>> super-hot keys here.
>> 
>> This option does not handle the “created” and “deleted” lifecycle events
>> for each database, but those are really quite simple and could really be
>> inserted directly into the _db_updates index.
>> 
>> ===
>> 
>> There are some additional details which can be fleshed out in an RFC, but
>> this is the basic gist of things. Both designs would be more robust at
>> capturing every single updated database (because the enqueue/increment
>> operation would be part of the document update transaction). They would
>> allow for a small delay between the document update and the appearance of
>> the database in _db_updates, which is no different than we have today. They
>> each require a background process.
>> 
>> Let’s hear what you think, both about the interest level for this feature
>> and any comments on the designs. I may take this one over to the FDB forums
>> as well for feedback. Cheers,
>> 
>> Adam


Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Steven Le Roux <le...@gmail.com>.
Hi Adam,

Regarding the consumer scaling point, I just want to mention that
Partitions are used in Kafka (or Pulsar) to scale a topic but not directly
consumers. If you map a consumer with a partition, you will really suffer
with operational issues.

Consumers are managed in "consumer groups" that isolate them which is a
handy way to choose between a shared message queue to a pub/sub model. If
you need consumers to read the same data, then you need them to have a
different consumer group id. They're used to track distribution accross
consumers by assigning partitions to consumers. Only one consumer in a
consumer group can subscribe a given partition.

While going distributed, I'd find the choice of relying on Pulsar or
Bookkeeper or Kafka as the distributed Log very interesting.

I would find very smart to provide two version of CouchDB :
 - a classic standalone/master/slave
 - a distributed with FoundationDB layer for KV and a distributed Log
option for the feeds/changes.


On Wed, Mar 20, 2019 at 11:47 PM Adam Kocoloski <ko...@apache.org> wrote:

> Hi all,
>
> Most of the discussions so far have focused on the core features that are
> fundamental to CouchDB: JSON documents, revision tracking, _changes. I
> thought I’d start a thread on something a bit different: the _db_updates
> feed.
>
> The _db_updates feed is an API that enables users to discover database
> lifecycle events across an entire CouchDB instance. It’s primarily useful
> in deployments that have lots and lots of databases, where it’s impractical
> to keep connections open for every database, and where database creations
> and deletions may be an automated aspect of the application’s use of
> CouchDB.
>
> There are really two topics for discussion here. The first is: do we need
> to keep it? The primary driver of applications creating lots of DBs is the
> per-DB granularity of access controls; if we go down the route of
> implementing the document-level _access proposal perhaps users naturally
> migrate away from this DB-per-user data model. I’d be curious to hear
> points of view there.
>
> I’ll assume for now that we do want to keep it, and offer some thoughts on
> how to implement it. The main challenge with _db_updates is managing the
> write contention; in write-heavy databases you have a lot of producers
> trying to tag that particular database as “updated", but all the consumer
> really cares about is getting a single “dbname”:”updated” event as needed.
> In the current architecture we try to dedupe a lot of the events in-memory
> before updating a regular CouchDB database with this information, but this
> leaves us exposed to possibly dropping events within a few second window.
>
> ## Option 1: Queue + Compaction
>
> One way to tackle this in FoundationDB is to have an intermediate subspace
> reserved as a queue. Each transaction that modifies a database would insert
> a versionstamped KV into the queue like
>
> Versionstamp = (DbName, EventType)
>
> Versionstamps are monotonically increasing and inserting versionstamped
> keys is a conflict-free operation. We’d have a consumer of this queue which
> is responsible for “log compaction”; i.e., the consumer would do range
> reads on the queue subspace, toss out duplicate contiguous
> “dbname”:“updated” events, and update a second index which would look more
> like the _changes feed.
>
> ### Scaling Consumers
>
> A single consumer can likely process 10k events/sec or more, but
> eventually we’ll need to scale. Borrowing from systems like Kafka the
> typical way to do this is to divide the queue into partitions and have
> individual consumers mapped to each partition. A partition in this model
> would just be a prefix on the Versionstamp:
>
> (PartitionID, Versionstamp) = (DbName, EventType)
>
> Our consumers will be more efficient and less likely to conflict with one
> another on updating the _db_updates index if messages are keyed to a
> partition based on DbName, although this still runs the risk that a couple
> of high-throughput databases could swamp a partition.
>
> I’m not sure about the best path forward for handling that scenario. One
> could implement a rate-limiter that starts sloughing off additional
> messages for high-throughput databases (which has some careful edge cases),
> split the messages for a single database across multiple partitions, rely
> on operators to blacklist certain databases from the _db_updates system,
> etc. Each has downsides.
>
> ## Option 2: Atomic Ops + Consumer
>
> In this approach we still have an intermediate subspace, and a consumer of
> that subspace which updates the _db_updates index. But this time, we have
> at most one KV per database in the subspace, with an atomic counter for a
> value. When a document is updated it bumps the counter for its database in
> that subspace. So we’ll have entries like
>
> (“counters”, “db1235”) = 1
> (“counters”, “db0001”) = 42
> (“counters”, “telemetry-db”) = 12312
>
> and so on. Like versionstamps, atomic operations are conflict-free so we
> need not worry about introducing spurious conflicts on high-throughput
> databases.
>
> The initial pass of the consumer logic would go something like this:
>
> - Do a snapshot range read of the “counters” subspace (or whatever we call
> it)
> - Record the current values for all counters in a separate summary KV
> (you’ll see why in a minute)
> - Do a limit=1 range read on the _changes space for each DB in the list to
> grab the latest Sequence
> - Update the _db_updates index with the latest Sequence for each of these
> databases
>
> On a second pass, the consumer would read the summary KV from the last
> pass and compare the previous counters with the current values. If any
> counters have not been updated in the interval, the consumer would try to
> clear those from the “counters” subspace (adding them as explicit conflict
> keys to ensure we don’t miss a concurrent update). It would then proceed
> with the rest of the logic from the initial pass. This is a careful
> balancing act:
>
> - We don’t want to pollute the “counters” subspace with idle databases
> because each entry requires an extra read of _changes
> - We don’t want to attempt to clear counters that are constantly updated
> because that’s going to fail with a conflict every time
>
> The scalability axis here is the number of databases updated within any
> short window of time (~1 second or less). If we end up with that number
> growing large we can have consumers responsible for range of the “counters”
> subspace, though I think that’s less likely than in the queue-based design.
>
> I don’t know in detail what optimizations FoundationDB applies to atomic
> operations (e.g. coalescing them at a layer above the storage engine).
> That’s worth checking into, as otherwise I’d be concerned about introducing
> super-hot keys here.
>
> This option does not handle the “created” and “deleted” lifecycle events
> for each database, but those are really quite simple and could really be
> inserted directly into the _db_updates index.
>
> ===
>
> There are some additional details which can be fleshed out in an RFC, but
> this is the basic gist of things. Both designs would be more robust at
> capturing every single updated database (because the enqueue/increment
> operation would be part of the document update transaction). They would
> allow for a small delay between the document update and the appearance of
> the database in _db_updates, which is no different than we have today. They
> each require a background process.
>
> Let’s hear what you think, both about the interest level for this feature
> and any comments on the designs. I may take this one over to the FDB forums
> as well for feedback. Cheers,
>
> Adam

Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Jan Lehnardt <ja...@apache.org>.
It’s entirely possible that I haven’t gone as deep into it. And I’m not yet up to speed on the subtleties of FDB performance characteristics :)

> On 25. Mar 2019, at 21:20, Adam Kocoloski <ko...@apache.org> wrote:
> 
> 
>> On Mar 25, 2019, at 6:02 AM, Jan Lehnardt <ja...@apache.org> wrote:
>> 
>> Gut feel I prefer Option 1.
> 
> Interesting. Option 1 was definitely my first idea on solving this problem, but the deeper I delved into the performance profile the more worried I got about it and so I started thinking about alternatives.
> 
> Adam

-- 
Professional Support for Apache CouchDB:
https://neighbourhood.ie/couchdb-support/


Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Adam Kocoloski <ko...@apache.org>.
> On Mar 25, 2019, at 6:02 AM, Jan Lehnardt <ja...@apache.org> wrote:
> 
> Gut feel I prefer Option 1.

Interesting. Option 1 was definitely my first idea on solving this problem, but the deeper I delved into the performance profile the more worried I got about it and so I started thinking about alternatives.

Adam

Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Jan Lehnardt <ja...@apache.org>.
Hi Adam,

thanks for drafting this,

> On 20. Mar 2019, at 23:47, Adam Kocoloski <ko...@apache.org> wrote:
> 
> Hi all,
> 
> Most of the discussions so far have focused on the core features that are fundamental to CouchDB: JSON documents, revision tracking, _changes. I thought I’d start a thread on something a bit different: the _db_updates feed.
> 
> The _db_updates feed is an API that enables users to discover database lifecycle events across an entire CouchDB instance. It’s primarily useful in deployments that have lots and lots of databases, where it’s impractical to keep connections open for every database, and where database creations and deletions may be an automated aspect of the application’s use of CouchDB.
> 
> There are really two topics for discussion here. The first is: do we need to keep it? The primary driver of applications creating lots of DBs is the per-DB granularity of access controls; if we go down the route of implementing the document-level _access proposal perhaps users naturally migrate away from this DB-per-user data model. I’d be curious to hear points of view there.
> 
> I’ll assume for now that we do want to keep it, and offer some thoughts on how to implement it. The main challenge with _db_updates is managing the write contention; in write-heavy databases you have a lot of producers trying to tag that particular database as “updated", but all the consumer really cares about is getting a single “dbname”:”updated” event as needed. In the current architecture we try to dedupe a lot of the events in-memory before updating a regular CouchDB database with this information, but this leaves us exposed to possibly dropping events within a few second window.

I’d also like to see this API stick around, mainly because we already see when discussing _access-design[1] that we probably won’t be able to cover all scenarios that people use db-per-user for.


[1]: https://lists.apache.org/thread.html/34b0d65a76b8b9532df7023c243b24df1afa8a9d2e8fe47645cd4272@%3Cdev.couchdb.apache.org%3E


> 
> ## Option 1: Queue + Compaction
> 
> One way to tackle this in FoundationDB is to have an intermediate subspace reserved as a queue. Each transaction that modifies a database would insert a versionstamped KV into the queue like
> 
> Versionstamp = (DbName, EventType)
> 
> Versionstamps are monotonically increasing and inserting versionstamped keys is a conflict-free operation. We’d have a consumer of this queue which is responsible for “log compaction”; i.e., the consumer would do range reads on the queue subspace, toss out duplicate contiguous “dbname”:“updated” events, and update a second index which would look more like the _changes feed.
> 
> ### Scaling Consumers
> 
> A single consumer can likely process 10k events/sec or more, but eventually we’ll need to scale. Borrowing from systems like Kafka the typical way to do this is to divide the queue into partitions and have individual consumers mapped to each partition. A partition in this model would just be a prefix on the Versionstamp:
> 
> (PartitionID, Versionstamp) = (DbName, EventType)
> 
> Our consumers will be more efficient and less likely to conflict with one another on updating the _db_updates index if messages are keyed to a partition based on DbName, although this still runs the risk that a couple of high-throughput databases could swamp a partition.
> 
> I’m not sure about the best path forward for handling that scenario. One could implement a rate-limiter that starts sloughing off additional messages for high-throughput databases (which has some careful edge cases), split the messages for a single database across multiple partitions, rely on operators to blacklist certain databases from the _db_updates system, etc. Each has downsides.

A minimum way out could be that we flag this in one of our _stats, so operators can do Something™ about it, which just might be to switch the modulo operator that distributes dbnames to partitions, it might not be pretty in terms of “toggle this and you are fine”, as you might have to go through a few iterations to get better partition use, but at least it is a way out that doesn’t sound too complicated.

Gut feel I prefer Option 1.

Best
Jan
—


> 
> ## Option 2: Atomic Ops + Consumer
> 
> In this approach we still have an intermediate subspace, and a consumer of that subspace which updates the _db_updates index. But this time, we have at most one KV per database in the subspace, with an atomic counter for a value. When a document is updated it bumps the counter for its database in that subspace. So we’ll have entries like
> 
> (“counters”, “db1235”) = 1
> (“counters”, “db0001”) = 42
> (“counters”, “telemetry-db”) = 12312
> 
> and so on. Like versionstamps, atomic operations are conflict-free so we need not worry about introducing spurious conflicts on high-throughput databases.
> 
> The initial pass of the consumer logic would go something like this:
> 
> - Do a snapshot range read of the “counters” subspace (or whatever we call it)
> - Record the current values for all counters in a separate summary KV (you’ll see why in a minute)
> - Do a limit=1 range read on the _changes space for each DB in the list to grab the latest Sequence
> - Update the _db_updates index with the latest Sequence for each of these databases
> 
> On a second pass, the consumer would read the summary KV from the last pass and compare the previous counters with the current values. If any counters have not been updated in the interval, the consumer would try to clear those from the “counters” subspace (adding them as explicit conflict keys to ensure we don’t miss a concurrent update). It would then proceed with the rest of the logic from the initial pass. This is a careful balancing act:
> 
> - We don’t want to pollute the “counters” subspace with idle databases because each entry requires an extra read of _changes
> - We don’t want to attempt to clear counters that are constantly updated because that’s going to fail with a conflict every time
> 
> The scalability axis here is the number of databases updated within any short window of time (~1 second or less). If we end up with that number growing large we can have consumers responsible for range of the “counters” subspace, though I think that’s less likely than in the queue-based design.
> 
> I don’t know in detail what optimizations FoundationDB applies to atomic operations (e.g. coalescing them at a layer above the storage engine). That’s worth checking into, as otherwise I’d be concerned about introducing super-hot keys here.
> 
> This option does not handle the “created” and “deleted” lifecycle events for each database, but those are really quite simple and could really be inserted directly into the _db_updates index.
> 
> ===
> 
> There are some additional details which can be fleshed out in an RFC, but this is the basic gist of things. Both designs would be more robust at capturing every single updated database (because the enqueue/increment operation would be part of the document update transaction). They would allow for a small delay between the document update and the appearance of the database in _db_updates, which is no different than we have today. They each require a background process.
> 
> Let’s hear what you think, both about the interest level for this feature and any comments on the designs. I may take this one over to the FDB forums as well for feedback. Cheers,
> 
> Adam

-- 
Professional Support for Apache CouchDB:
https://neighbourhood.ie/couchdb-support/


Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Adam Kocoloski <ko...@apache.org>.
Thanks Alex - I need to get in the habit of posting more frequently over there :)

Adam

> On Apr 10, 2019, at 3:16 AM, Alex Miller <al...@apple.com.INVALID> wrote:
> 
> 
>> On 2019/03/20 22:47:42, Adam Kocoloski <ko...@apache.org> wrote: 
>> I don’t know in detail what optimizations FoundationDB applies to atomic operations (e.g. coalescing them at a layer above the storage engine). That’s worth checking into, as otherwise I’d be concerned about introducing super-hot keys here.
> 
> 
> Inducing hot keys or hot shards would both be of concern.  There is logic that will try to deal with write hotspots, but it splits shard based on write bandwidth to a particular shard, which likely wouldn’t be triggered by a stream of atomic operations.
> 
> Clients will merge atomic operations together if you issue multiple in a transaction, e.g. I believe three ATOMIC_ADDs on one key will become one ATOMIC_ADD of the sum.  There isn’t particularly any logic that optimizes the handling of atomic operations once they leave the client.  Storage servers only commit every ~250ms, so this shouldn’t translate into a full page write on each atomic operation to the same key, but the storage server will end up applying each atomic operation individually.
> 
> ( And I’ve gone and filed https://github.com/apple/foundationdb/issues/1450 to trigger more discussion within FoundationDB on this topic. )
> 
> 
>> On Mar 25, 2019, at 11:42 AM, Adam Kocoloski <ko...@apache.org> wrote:
>>> On Mar 25, 2019, at 12:48 PM, Mike Rhodes <co...@dx13.co.uk> wrote:
>>> 
>>> I couldn't immediately see how we cleared out older entries from this potentially very large queue. For example, the worker processing the queue to deduplicate might issue range deletes after processing each "batch". Is this simple enough to do?
>> 
>> Yes, that’s the (implicit) idea. Simple to implement, not clear to me how well the storage servers can handle the load. I think the “range clears are cheap” statement largely refers to the transaction management system.
> 
> At the storage level, range clears are cheap in terms of immediate execution, and slow in terms of total work performed.  You can find the details of this in https://forums.foundationdb.org/t/shards-are-not-splitted-into-smaller-ones/815/4 , and an example of what to be careful about with range clears on https://forums.foundationdb.org/t/used-disk-space-dramatically-increases-while-sum-of-key-value-sizes-is-constant/644
> 
> 
>> On Mar 27, 2019, at 11:07 AM, Ilya Khlopotov <ii...@apache.org> wrote:
>> What if FDB would support a list type for a value and would have an atomic operation to add the value to the list if it is missing. In this case we could store the data we need as follows (under separate subspace TBD).
>> VersionStamp = (DbName, EventType)
>> DbName = [versionstamps]
>> ...
>> The question is how we would implement atomic addition of a value to a list. 
> 
> If I’ve understood your proposal correctly, I think it sounds easier to just use APPEND_IF_FITS ?
> 
> https://apple.github.io/foundationdb/javadoc/index.html?com/apple/foundationdb/MutationType.html
> 


Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Alex Miller <al...@apple.com.INVALID>.
> On 2019/03/20 22:47:42, Adam Kocoloski <ko...@apache.org> wrote: 
> I don’t know in detail what optimizations FoundationDB applies to atomic operations (e.g. coalescing them at a layer above the storage engine). That’s worth checking into, as otherwise I’d be concerned about introducing super-hot keys here.


Inducing hot keys or hot shards would both be of concern.  There is logic that will try to deal with write hotspots, but it splits shard based on write bandwidth to a particular shard, which likely wouldn’t be triggered by a stream of atomic operations.

Clients will merge atomic operations together if you issue multiple in a transaction, e.g. I believe three ATOMIC_ADDs on one key will become one ATOMIC_ADD of the sum.  There isn’t particularly any logic that optimizes the handling of atomic operations once they leave the client.  Storage servers only commit every ~250ms, so this shouldn’t translate into a full page write on each atomic operation to the same key, but the storage server will end up applying each atomic operation individually.

( And I’ve gone and filed https://github.com/apple/foundationdb/issues/1450 to trigger more discussion within FoundationDB on this topic. )


> On Mar 25, 2019, at 11:42 AM, Adam Kocoloski <ko...@apache.org> wrote:
>> On Mar 25, 2019, at 12:48 PM, Mike Rhodes <co...@dx13.co.uk> wrote:
>> 
>> I couldn't immediately see how we cleared out older entries from this potentially very large queue. For example, the worker processing the queue to deduplicate might issue range deletes after processing each "batch". Is this simple enough to do?
> 
> Yes, that’s the (implicit) idea. Simple to implement, not clear to me how well the storage servers can handle the load. I think the “range clears are cheap” statement largely refers to the transaction management system.

At the storage level, range clears are cheap in terms of immediate execution, and slow in terms of total work performed.  You can find the details of this in https://forums.foundationdb.org/t/shards-are-not-splitted-into-smaller-ones/815/4 , and an example of what to be careful about with range clears on https://forums.foundationdb.org/t/used-disk-space-dramatically-increases-while-sum-of-key-value-sizes-is-constant/644


> On Mar 27, 2019, at 11:07 AM, Ilya Khlopotov <ii...@apache.org> wrote:
> What if FDB would support a list type for a value and would have an atomic operation to add the value to the list if it is missing. In this case we could store the data we need as follows (under separate subspace TBD).
> VersionStamp = (DbName, EventType)
> DbName = [versionstamps]
> ...
> The question is how we would implement atomic addition of a value to a list. 

If I’ve understood your proposal correctly, I think it sounds easier to just use APPEND_IF_FITS ?

https://apple.github.io/foundationdb/javadoc/index.html?com/apple/foundationdb/MutationType.html


Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Ilya Khlopotov <ii...@apache.org>.
> Due to scoping of the de-duplication operation to single database and use of random sampling we would be able to cleanup frequently updated operation at a different rate than less frequently updated ones. 

There is a typo there ^^^. It should be corrected as:
Due to scoping of the de-duplication operation to single database and use of random sampling we would be able to cleanup frequently updated *databases* at a different rate than less frequently updated ones. 

On 2019/03/27 21:55:14, Ilya Khlopotov <ii...@apache.org> wrote: 
> 
> 
> > I don’t understand why you want to atomically append to an array here instead of using a separate 
> > (DbName, Versionstamp) KV each time. What’s the advantage? Both structures require periodic 
> > cleanup. I also don’t understand why you need this DbName -> Versionstamp mapping at all. Is there > a reason to do some per-database cleanup on the contents of this global feed?
> The idea is to amortize the cleanup/de-duplication cost. We can trigger cleanup from the write transactions chosen by random sampling. However, we want to constrain cleanup to the events of a single database to avoid coordination between multiple de-duplication processes. Therefore, we need to maintain a history of updates to the database (since we use the versionstamp as a key). In this context (DbName, Versionstamp) is a very good idea. Because it would allow us to use standard range operations instead of messing with IBLTs.
> 
> # Summary
> 
> - Every update transaction would write following 
>    Sequence = (DbName, EventType)
>    (DbName, Sequence) = True
> - When update transaction is finished we would generate random number to decide if we need to trigger de-duplication
> - If we need to trigger de-duplication we would spawn a new process and pass the name of the database to it
> - In that process we would do the following
>   maxSequence = 0
>   for _, Sequence in range(DbName, *):
>      - remove oldest entries in "Sequence -> (DbName, EventType)" mapping
>      - materialize the results in more consumable form
>      maxSequence = max(maxSequence, Sequence)
>   - issue delete range request range((DbName, *), last_less_than((DbName, maxSequence))
> 
> Due to scoping of the de-duplication operation to single database and use of random sampling we would be able to cleanup frequently updated operation at a different rate than less frequently updated ones. 
> 
> I hope it does make sense.
> 
> On 2019/03/27 20:33:16, Adam Kocoloski <ko...@apache.org> wrote: 
> >  Hi Ilya,
> > 
> > I agree it would be quite nice if there was a way to implement this feature without a background worker — while also avoiding write contention for transactions that would otherwise not conflict with one another. I’m not sure it’s possible.
> > 
> > I have a few comments:
> > 
> > > We could maintain database level Sequence number and store global changes feed in the following form:
> > >   UpdateSequence = (DbName, EventType, PreviousUpdateSequence)
> > 
> > Tracking a database-wide “latest Sequence” in a single KV would mean we can’t execute any transactions on that database in parallel, so yet another reason why that strawman approach route cannot work.
> > 
> > > In this case we could store the data we need as follows (under separate subspace TBD).
> > > VersionStamp = (DbName, EventType)
> > > DbName = [versionstamps]
> > 
> > I don’t understand why you want to atomically append to an array here instead of using a separate (DbName, Versionstamp) KV each time. What’s the advantage? Both structures require periodic cleanup. I also don’t understand why you need this DbName -> Versionstamp mapping at all. Is there a reason to do some per-database cleanup on the contents of this global feed?
> > 
> > Cheers, Adam
> > 
> > 
> > > On Mar 27, 2019, at 2:07 PM, Ilya Khlopotov <ii...@apache.org> wrote:
> > > 
> > > Hi, 
> > > 
> > > Both proposals are fine but need a consumer process. Which is a tricky requirement because it will lead to problems in cases when queue grows faster than we can consume it. This realization got me thinking about finding possible ways to eliminate the need for a consumer.
> > > 
> > > I wouldn't spell out the final solution right away since I want to demonstrate the thinking process so others could build better proposals on top of it. 
> > > 
> > > Essentially, we need to de-duplicate events. In order to do that we need to know when given database was updated last time. We could maintain database level Sequence number and store global changes feed in the following form:
> > >   UpdateSequence = (DbName, EventType, PreviousUpdateSequence)
> > > 
> > > Then every 10th (or 100th or 1000th) transaction can trigger a compaction process for updated database. It would use PreviousUpdateSequence to get pointer to get its parent, read pointer to grandparent, cleanup parent, and so on so force until we wouldn't have anything to clean up.
> > > 
> > > This is a terrible idea for the following reasons:
> > > - Including UpdateSequence is expensive since we would need to add one more read to every update transaction
> > > - recursion to do cleanup is expensive and most likely would need to be done in multiple transactions
> > > 
> > > What if FDB would support a list type for a value and would have an atomic operation to add the value to the list if it is missing. In this case we could store the data we need as follows (under separate subspace TBD).
> > > VersionStamp = (DbName, EventType)
> > > DbName = [versionstamps]
> > > 
> > > In this case in order to de-duplicate events, we would do the following:
> > > - every once in a while (every 10th (or 100th or 1000th) update transaction (we would use PRNG ) to specific database) would execute compaction algorithm 
> > > - Read list of versionstamps for older updates and issue remove operations for every version stamp except the biggest one
> > > - update history value to include only biggest versionstamp
> > > 
> > > The question is how we would implement atomic addition of a value to a list. There is an IBLT data structure (https://arxiv.org/pdf/1101.2245.pdf) which can help us to achieve that. IBLT consists of the multiple cells where every cell has the following fields:
> > > - count
> > > - keySum
> > > - valueSum
> > > - hashkeySum
> > > 
> > > The beauty of this structure is that all fields are updated using blind addition operations while supporting enumeration of all key-values stored in the structure (with configurable probability). Which is available in FDB (aka atomic addition).
> > > 
> > > For our specific case it doesn't look like we need valueSum (because we only need keys) and hashkeySum (because we wouldn't have duplicates), so we can simplify the structure.
> > > 
> > > Best regards,
> > > iilyak
> > > 
> > > 
> > > On 2019/03/20 22:47:42, Adam Kocoloski <ko...@apache.org> wrote: 
> > >> Hi all,
> > >> 
> > >> Most of the discussions so far have focused on the core features that are fundamental to CouchDB: JSON documents, revision tracking, _changes. I thought I’d start a thread on something a bit different: the _db_updates feed.
> > >> 
> > >> The _db_updates feed is an API that enables users to discover database lifecycle events across an entire CouchDB instance. It’s primarily useful in deployments that have lots and lots of databases, where it’s impractical to keep connections open for every database, and where database creations and deletions may be an automated aspect of the application’s use of CouchDB.
> > >> 
> > >> There are really two topics for discussion here. The first is: do we need to keep it? The primary driver of applications creating lots of DBs is the per-DB granularity of access controls; if we go down the route of implementing the document-level _access proposal perhaps users naturally migrate away from this DB-per-user data model. I’d be curious to hear points of view there.
> > >> 
> > >> I’ll assume for now that we do want to keep it, and offer some thoughts on how to implement it. The main challenge with _db_updates is managing the write contention; in write-heavy databases you have a lot of producers trying to tag that particular database as “updated", but all the consumer really cares about is getting a single “dbname”:”updated” event as needed. In the current architecture we try to dedupe a lot of the events in-memory before updating a regular CouchDB database with this information, but this leaves us exposed to possibly dropping events within a few second window.
> > >> 
> > >> ## Option 1: Queue + Compaction
> > >> 
> > >> One way to tackle this in FoundationDB is to have an intermediate subspace reserved as a queue. Each transaction that modifies a database would insert a versionstamped KV into the queue like
> > >> 
> > >> Versionstamp = (DbName, EventType)
> > >> 
> > >> Versionstamps are monotonically increasing and inserting versionstamped keys is a conflict-free operation. We’d have a consumer of this queue which is responsible for “log compaction”; i.e., the consumer would do range reads on the queue subspace, toss out duplicate contiguous “dbname”:“updated” events, and update a second index which would look more like the _changes feed.
> > >> 
> > >> ### Scaling Consumers
> > >> 
> > >> A single consumer can likely process 10k events/sec or more, but eventually we’ll need to scale. Borrowing from systems like Kafka the typical way to do this is to divide the queue into partitions and have individual consumers mapped to each partition. A partition in this model would just be a prefix on the Versionstamp:
> > >> 
> > >> (PartitionID, Versionstamp) = (DbName, EventType)
> > >> 
> > >> Our consumers will be more efficient and less likely to conflict with one another on updating the _db_updates index if messages are keyed to a partition based on DbName, although this still runs the risk that a couple of high-throughput databases could swamp a partition.
> > >> 
> > >> I’m not sure about the best path forward for handling that scenario. One could implement a rate-limiter that starts sloughing off additional messages for high-throughput databases (which has some careful edge cases), split the messages for a single database across multiple partitions, rely on operators to blacklist certain databases from the _db_updates system, etc. Each has downsides.
> > >> 
> > >> ## Option 2: Atomic Ops + Consumer
> > >> 
> > >> In this approach we still have an intermediate subspace, and a consumer of that subspace which updates the _db_updates index. But this time, we have at most one KV per database in the subspace, with an atomic counter for a value. When a document is updated it bumps the counter for its database in that subspace. So we’ll have entries like
> > >> 
> > >> (“counters”, “db1235”) = 1
> > >> (“counters”, “db0001”) = 42
> > >> (“counters”, “telemetry-db”) = 12312
> > >> 
> > >> and so on. Like versionstamps, atomic operations are conflict-free so we need not worry about introducing spurious conflicts on high-throughput databases.
> > >> 
> > >> The initial pass of the consumer logic would go something like this:
> > >> 
> > >> - Do a snapshot range read of the “counters” subspace (or whatever we call it)
> > >> - Record the current values for all counters in a separate summary KV (you’ll see why in a minute)
> > >> - Do a limit=1 range read on the _changes space for each DB in the list to grab the latest Sequence
> > >> - Update the _db_updates index with the latest Sequence for each of these databases
> > >> 
> > >> On a second pass, the consumer would read the summary KV from the last pass and compare the previous counters with the current values. If any counters have not been updated in the interval, the consumer would try to clear those from the “counters” subspace (adding them as explicit conflict keys to ensure we don’t miss a concurrent update). It would then proceed with the rest of the logic from the initial pass. This is a careful balancing act:
> > >> 
> > >> - We don’t want to pollute the “counters” subspace with idle databases because each entry requires an extra read of _changes
> > >> - We don’t want to attempt to clear counters that are constantly updated because that’s going to fail with a conflict every time
> > >> 
> > >> The scalability axis here is the number of databases updated within any short window of time (~1 second or less). If we end up with that number growing large we can have consumers responsible for range of the “counters” subspace, though I think that’s less likely than in the queue-based design.
> > >> 
> > >> I don’t know in detail what optimizations FoundationDB applies to atomic operations (e.g. coalescing them at a layer above the storage engine). That’s worth checking into, as otherwise I’d be concerned about introducing super-hot keys here.
> > >> 
> > >> This option does not handle the “created” and “deleted” lifecycle events for each database, but those are really quite simple and could really be inserted directly into the _db_updates index.
> > >> 
> > >> ===
> > >> 
> > >> There are some additional details which can be fleshed out in an RFC, but this is the basic gist of things. Both designs would be more robust at capturing every single updated database (because the enqueue/increment operation would be part of the document update transaction). They would allow for a small delay between the document update and the appearance of the database in _db_updates, which is no different than we have today. They each require a background process.
> > >> 
> > >> Let’s hear what you think, both about the interest level for this feature and any comments on the designs. I may take this one over to the FDB forums as well for feedback. Cheers,
> > >> 
> > >> Adam
> > 
> > 
> 

Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Ilya Khlopotov <ii...@apache.org>.

> I don’t understand why you want to atomically append to an array here instead of using a separate 
> (DbName, Versionstamp) KV each time. What’s the advantage? Both structures require periodic 
> cleanup. I also don’t understand why you need this DbName -> Versionstamp mapping at all. Is there > a reason to do some per-database cleanup on the contents of this global feed?
The idea is to amortize the cleanup/de-duplication cost. We can trigger cleanup from the write transactions chosen by random sampling. However, we want to constrain cleanup to the events of a single database to avoid coordination between multiple de-duplication processes. Therefore, we need to maintain a history of updates to the database (since we use the versionstamp as a key). In this context (DbName, Versionstamp) is a very good idea. Because it would allow us to use standard range operations instead of messing with IBLTs.

# Summary

- Every update transaction would write following 
   Sequence = (DbName, EventType)
   (DbName, Sequence) = True
- When update transaction is finished we would generate random number to decide if we need to trigger de-duplication
- If we need to trigger de-duplication we would spawn a new process and pass the name of the database to it
- In that process we would do the following
  maxSequence = 0
  for _, Sequence in range(DbName, *):
     - remove oldest entries in "Sequence -> (DbName, EventType)" mapping
     - materialize the results in more consumable form
     maxSequence = max(maxSequence, Sequence)
  - issue delete range request range((DbName, *), last_less_than((DbName, maxSequence))

Due to scoping of the de-duplication operation to single database and use of random sampling we would be able to cleanup frequently updated operation at a different rate than less frequently updated ones. 

I hope it does make sense.

On 2019/03/27 20:33:16, Adam Kocoloski <ko...@apache.org> wrote: 
>  Hi Ilya,
> 
> I agree it would be quite nice if there was a way to implement this feature without a background worker — while also avoiding write contention for transactions that would otherwise not conflict with one another. I’m not sure it’s possible.
> 
> I have a few comments:
> 
> > We could maintain database level Sequence number and store global changes feed in the following form:
> >   UpdateSequence = (DbName, EventType, PreviousUpdateSequence)
> 
> Tracking a database-wide “latest Sequence” in a single KV would mean we can’t execute any transactions on that database in parallel, so yet another reason why that strawman approach route cannot work.
> 
> > In this case we could store the data we need as follows (under separate subspace TBD).
> > VersionStamp = (DbName, EventType)
> > DbName = [versionstamps]
> 
> I don’t understand why you want to atomically append to an array here instead of using a separate (DbName, Versionstamp) KV each time. What’s the advantage? Both structures require periodic cleanup. I also don’t understand why you need this DbName -> Versionstamp mapping at all. Is there a reason to do some per-database cleanup on the contents of this global feed?
> 
> Cheers, Adam
> 
> 
> > On Mar 27, 2019, at 2:07 PM, Ilya Khlopotov <ii...@apache.org> wrote:
> > 
> > Hi, 
> > 
> > Both proposals are fine but need a consumer process. Which is a tricky requirement because it will lead to problems in cases when queue grows faster than we can consume it. This realization got me thinking about finding possible ways to eliminate the need for a consumer.
> > 
> > I wouldn't spell out the final solution right away since I want to demonstrate the thinking process so others could build better proposals on top of it. 
> > 
> > Essentially, we need to de-duplicate events. In order to do that we need to know when given database was updated last time. We could maintain database level Sequence number and store global changes feed in the following form:
> >   UpdateSequence = (DbName, EventType, PreviousUpdateSequence)
> > 
> > Then every 10th (or 100th or 1000th) transaction can trigger a compaction process for updated database. It would use PreviousUpdateSequence to get pointer to get its parent, read pointer to grandparent, cleanup parent, and so on so force until we wouldn't have anything to clean up.
> > 
> > This is a terrible idea for the following reasons:
> > - Including UpdateSequence is expensive since we would need to add one more read to every update transaction
> > - recursion to do cleanup is expensive and most likely would need to be done in multiple transactions
> > 
> > What if FDB would support a list type for a value and would have an atomic operation to add the value to the list if it is missing. In this case we could store the data we need as follows (under separate subspace TBD).
> > VersionStamp = (DbName, EventType)
> > DbName = [versionstamps]
> > 
> > In this case in order to de-duplicate events, we would do the following:
> > - every once in a while (every 10th (or 100th or 1000th) update transaction (we would use PRNG ) to specific database) would execute compaction algorithm 
> > - Read list of versionstamps for older updates and issue remove operations for every version stamp except the biggest one
> > - update history value to include only biggest versionstamp
> > 
> > The question is how we would implement atomic addition of a value to a list. There is an IBLT data structure (https://arxiv.org/pdf/1101.2245.pdf) which can help us to achieve that. IBLT consists of the multiple cells where every cell has the following fields:
> > - count
> > - keySum
> > - valueSum
> > - hashkeySum
> > 
> > The beauty of this structure is that all fields are updated using blind addition operations while supporting enumeration of all key-values stored in the structure (with configurable probability). Which is available in FDB (aka atomic addition).
> > 
> > For our specific case it doesn't look like we need valueSum (because we only need keys) and hashkeySum (because we wouldn't have duplicates), so we can simplify the structure.
> > 
> > Best regards,
> > iilyak
> > 
> > 
> > On 2019/03/20 22:47:42, Adam Kocoloski <ko...@apache.org> wrote: 
> >> Hi all,
> >> 
> >> Most of the discussions so far have focused on the core features that are fundamental to CouchDB: JSON documents, revision tracking, _changes. I thought I’d start a thread on something a bit different: the _db_updates feed.
> >> 
> >> The _db_updates feed is an API that enables users to discover database lifecycle events across an entire CouchDB instance. It’s primarily useful in deployments that have lots and lots of databases, where it’s impractical to keep connections open for every database, and where database creations and deletions may be an automated aspect of the application’s use of CouchDB.
> >> 
> >> There are really two topics for discussion here. The first is: do we need to keep it? The primary driver of applications creating lots of DBs is the per-DB granularity of access controls; if we go down the route of implementing the document-level _access proposal perhaps users naturally migrate away from this DB-per-user data model. I’d be curious to hear points of view there.
> >> 
> >> I’ll assume for now that we do want to keep it, and offer some thoughts on how to implement it. The main challenge with _db_updates is managing the write contention; in write-heavy databases you have a lot of producers trying to tag that particular database as “updated", but all the consumer really cares about is getting a single “dbname”:”updated” event as needed. In the current architecture we try to dedupe a lot of the events in-memory before updating a regular CouchDB database with this information, but this leaves us exposed to possibly dropping events within a few second window.
> >> 
> >> ## Option 1: Queue + Compaction
> >> 
> >> One way to tackle this in FoundationDB is to have an intermediate subspace reserved as a queue. Each transaction that modifies a database would insert a versionstamped KV into the queue like
> >> 
> >> Versionstamp = (DbName, EventType)
> >> 
> >> Versionstamps are monotonically increasing and inserting versionstamped keys is a conflict-free operation. We’d have a consumer of this queue which is responsible for “log compaction”; i.e., the consumer would do range reads on the queue subspace, toss out duplicate contiguous “dbname”:“updated” events, and update a second index which would look more like the _changes feed.
> >> 
> >> ### Scaling Consumers
> >> 
> >> A single consumer can likely process 10k events/sec or more, but eventually we’ll need to scale. Borrowing from systems like Kafka the typical way to do this is to divide the queue into partitions and have individual consumers mapped to each partition. A partition in this model would just be a prefix on the Versionstamp:
> >> 
> >> (PartitionID, Versionstamp) = (DbName, EventType)
> >> 
> >> Our consumers will be more efficient and less likely to conflict with one another on updating the _db_updates index if messages are keyed to a partition based on DbName, although this still runs the risk that a couple of high-throughput databases could swamp a partition.
> >> 
> >> I’m not sure about the best path forward for handling that scenario. One could implement a rate-limiter that starts sloughing off additional messages for high-throughput databases (which has some careful edge cases), split the messages for a single database across multiple partitions, rely on operators to blacklist certain databases from the _db_updates system, etc. Each has downsides.
> >> 
> >> ## Option 2: Atomic Ops + Consumer
> >> 
> >> In this approach we still have an intermediate subspace, and a consumer of that subspace which updates the _db_updates index. But this time, we have at most one KV per database in the subspace, with an atomic counter for a value. When a document is updated it bumps the counter for its database in that subspace. So we’ll have entries like
> >> 
> >> (“counters”, “db1235”) = 1
> >> (“counters”, “db0001”) = 42
> >> (“counters”, “telemetry-db”) = 12312
> >> 
> >> and so on. Like versionstamps, atomic operations are conflict-free so we need not worry about introducing spurious conflicts on high-throughput databases.
> >> 
> >> The initial pass of the consumer logic would go something like this:
> >> 
> >> - Do a snapshot range read of the “counters” subspace (or whatever we call it)
> >> - Record the current values for all counters in a separate summary KV (you’ll see why in a minute)
> >> - Do a limit=1 range read on the _changes space for each DB in the list to grab the latest Sequence
> >> - Update the _db_updates index with the latest Sequence for each of these databases
> >> 
> >> On a second pass, the consumer would read the summary KV from the last pass and compare the previous counters with the current values. If any counters have not been updated in the interval, the consumer would try to clear those from the “counters” subspace (adding them as explicit conflict keys to ensure we don’t miss a concurrent update). It would then proceed with the rest of the logic from the initial pass. This is a careful balancing act:
> >> 
> >> - We don’t want to pollute the “counters” subspace with idle databases because each entry requires an extra read of _changes
> >> - We don’t want to attempt to clear counters that are constantly updated because that’s going to fail with a conflict every time
> >> 
> >> The scalability axis here is the number of databases updated within any short window of time (~1 second or less). If we end up with that number growing large we can have consumers responsible for range of the “counters” subspace, though I think that’s less likely than in the queue-based design.
> >> 
> >> I don’t know in detail what optimizations FoundationDB applies to atomic operations (e.g. coalescing them at a layer above the storage engine). That’s worth checking into, as otherwise I’d be concerned about introducing super-hot keys here.
> >> 
> >> This option does not handle the “created” and “deleted” lifecycle events for each database, but those are really quite simple and could really be inserted directly into the _db_updates index.
> >> 
> >> ===
> >> 
> >> There are some additional details which can be fleshed out in an RFC, but this is the basic gist of things. Both designs would be more robust at capturing every single updated database (because the enqueue/increment operation would be part of the document update transaction). They would allow for a small delay between the document update and the appearance of the database in _db_updates, which is no different than we have today. They each require a background process.
> >> 
> >> Let’s hear what you think, both about the interest level for this feature and any comments on the designs. I may take this one over to the FDB forums as well for feedback. Cheers,
> >> 
> >> Adam
> 
> 

Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Adam Kocoloski <ko...@apache.org>.
 Hi Ilya,

I agree it would be quite nice if there was a way to implement this feature without a background worker — while also avoiding write contention for transactions that would otherwise not conflict with one another. I’m not sure it’s possible.

I have a few comments:

> We could maintain database level Sequence number and store global changes feed in the following form:
>   UpdateSequence = (DbName, EventType, PreviousUpdateSequence)

Tracking a database-wide “latest Sequence” in a single KV would mean we can’t execute any transactions on that database in parallel, so yet another reason why that strawman approach route cannot work.

> In this case we could store the data we need as follows (under separate subspace TBD).
> VersionStamp = (DbName, EventType)
> DbName = [versionstamps]

I don’t understand why you want to atomically append to an array here instead of using a separate (DbName, Versionstamp) KV each time. What’s the advantage? Both structures require periodic cleanup. I also don’t understand why you need this DbName -> Versionstamp mapping at all. Is there a reason to do some per-database cleanup on the contents of this global feed?

Cheers, Adam


> On Mar 27, 2019, at 2:07 PM, Ilya Khlopotov <ii...@apache.org> wrote:
> 
> Hi, 
> 
> Both proposals are fine but need a consumer process. Which is a tricky requirement because it will lead to problems in cases when queue grows faster than we can consume it. This realization got me thinking about finding possible ways to eliminate the need for a consumer.
> 
> I wouldn't spell out the final solution right away since I want to demonstrate the thinking process so others could build better proposals on top of it. 
> 
> Essentially, we need to de-duplicate events. In order to do that we need to know when given database was updated last time. We could maintain database level Sequence number and store global changes feed in the following form:
>   UpdateSequence = (DbName, EventType, PreviousUpdateSequence)
> 
> Then every 10th (or 100th or 1000th) transaction can trigger a compaction process for updated database. It would use PreviousUpdateSequence to get pointer to get its parent, read pointer to grandparent, cleanup parent, and so on so force until we wouldn't have anything to clean up.
> 
> This is a terrible idea for the following reasons:
> - Including UpdateSequence is expensive since we would need to add one more read to every update transaction
> - recursion to do cleanup is expensive and most likely would need to be done in multiple transactions
> 
> What if FDB would support a list type for a value and would have an atomic operation to add the value to the list if it is missing. In this case we could store the data we need as follows (under separate subspace TBD).
> VersionStamp = (DbName, EventType)
> DbName = [versionstamps]
> 
> In this case in order to de-duplicate events, we would do the following:
> - every once in a while (every 10th (or 100th or 1000th) update transaction (we would use PRNG ) to specific database) would execute compaction algorithm 
> - Read list of versionstamps for older updates and issue remove operations for every version stamp except the biggest one
> - update history value to include only biggest versionstamp
> 
> The question is how we would implement atomic addition of a value to a list. There is an IBLT data structure (https://arxiv.org/pdf/1101.2245.pdf) which can help us to achieve that. IBLT consists of the multiple cells where every cell has the following fields:
> - count
> - keySum
> - valueSum
> - hashkeySum
> 
> The beauty of this structure is that all fields are updated using blind addition operations while supporting enumeration of all key-values stored in the structure (with configurable probability). Which is available in FDB (aka atomic addition).
> 
> For our specific case it doesn't look like we need valueSum (because we only need keys) and hashkeySum (because we wouldn't have duplicates), so we can simplify the structure.
> 
> Best regards,
> iilyak
> 
> 
> On 2019/03/20 22:47:42, Adam Kocoloski <ko...@apache.org> wrote: 
>> Hi all,
>> 
>> Most of the discussions so far have focused on the core features that are fundamental to CouchDB: JSON documents, revision tracking, _changes. I thought I’d start a thread on something a bit different: the _db_updates feed.
>> 
>> The _db_updates feed is an API that enables users to discover database lifecycle events across an entire CouchDB instance. It’s primarily useful in deployments that have lots and lots of databases, where it’s impractical to keep connections open for every database, and where database creations and deletions may be an automated aspect of the application’s use of CouchDB.
>> 
>> There are really two topics for discussion here. The first is: do we need to keep it? The primary driver of applications creating lots of DBs is the per-DB granularity of access controls; if we go down the route of implementing the document-level _access proposal perhaps users naturally migrate away from this DB-per-user data model. I’d be curious to hear points of view there.
>> 
>> I’ll assume for now that we do want to keep it, and offer some thoughts on how to implement it. The main challenge with _db_updates is managing the write contention; in write-heavy databases you have a lot of producers trying to tag that particular database as “updated", but all the consumer really cares about is getting a single “dbname”:”updated” event as needed. In the current architecture we try to dedupe a lot of the events in-memory before updating a regular CouchDB database with this information, but this leaves us exposed to possibly dropping events within a few second window.
>> 
>> ## Option 1: Queue + Compaction
>> 
>> One way to tackle this in FoundationDB is to have an intermediate subspace reserved as a queue. Each transaction that modifies a database would insert a versionstamped KV into the queue like
>> 
>> Versionstamp = (DbName, EventType)
>> 
>> Versionstamps are monotonically increasing and inserting versionstamped keys is a conflict-free operation. We’d have a consumer of this queue which is responsible for “log compaction”; i.e., the consumer would do range reads on the queue subspace, toss out duplicate contiguous “dbname”:“updated” events, and update a second index which would look more like the _changes feed.
>> 
>> ### Scaling Consumers
>> 
>> A single consumer can likely process 10k events/sec or more, but eventually we’ll need to scale. Borrowing from systems like Kafka the typical way to do this is to divide the queue into partitions and have individual consumers mapped to each partition. A partition in this model would just be a prefix on the Versionstamp:
>> 
>> (PartitionID, Versionstamp) = (DbName, EventType)
>> 
>> Our consumers will be more efficient and less likely to conflict with one another on updating the _db_updates index if messages are keyed to a partition based on DbName, although this still runs the risk that a couple of high-throughput databases could swamp a partition.
>> 
>> I’m not sure about the best path forward for handling that scenario. One could implement a rate-limiter that starts sloughing off additional messages for high-throughput databases (which has some careful edge cases), split the messages for a single database across multiple partitions, rely on operators to blacklist certain databases from the _db_updates system, etc. Each has downsides.
>> 
>> ## Option 2: Atomic Ops + Consumer
>> 
>> In this approach we still have an intermediate subspace, and a consumer of that subspace which updates the _db_updates index. But this time, we have at most one KV per database in the subspace, with an atomic counter for a value. When a document is updated it bumps the counter for its database in that subspace. So we’ll have entries like
>> 
>> (“counters”, “db1235”) = 1
>> (“counters”, “db0001”) = 42
>> (“counters”, “telemetry-db”) = 12312
>> 
>> and so on. Like versionstamps, atomic operations are conflict-free so we need not worry about introducing spurious conflicts on high-throughput databases.
>> 
>> The initial pass of the consumer logic would go something like this:
>> 
>> - Do a snapshot range read of the “counters” subspace (or whatever we call it)
>> - Record the current values for all counters in a separate summary KV (you’ll see why in a minute)
>> - Do a limit=1 range read on the _changes space for each DB in the list to grab the latest Sequence
>> - Update the _db_updates index with the latest Sequence for each of these databases
>> 
>> On a second pass, the consumer would read the summary KV from the last pass and compare the previous counters with the current values. If any counters have not been updated in the interval, the consumer would try to clear those from the “counters” subspace (adding them as explicit conflict keys to ensure we don’t miss a concurrent update). It would then proceed with the rest of the logic from the initial pass. This is a careful balancing act:
>> 
>> - We don’t want to pollute the “counters” subspace with idle databases because each entry requires an extra read of _changes
>> - We don’t want to attempt to clear counters that are constantly updated because that’s going to fail with a conflict every time
>> 
>> The scalability axis here is the number of databases updated within any short window of time (~1 second or less). If we end up with that number growing large we can have consumers responsible for range of the “counters” subspace, though I think that’s less likely than in the queue-based design.
>> 
>> I don’t know in detail what optimizations FoundationDB applies to atomic operations (e.g. coalescing them at a layer above the storage engine). That’s worth checking into, as otherwise I’d be concerned about introducing super-hot keys here.
>> 
>> This option does not handle the “created” and “deleted” lifecycle events for each database, but those are really quite simple and could really be inserted directly into the _db_updates index.
>> 
>> ===
>> 
>> There are some additional details which can be fleshed out in an RFC, but this is the basic gist of things. Both designs would be more robust at capturing every single updated database (because the enqueue/increment operation would be part of the document update transaction). They would allow for a small delay between the document update and the appearance of the database in _db_updates, which is no different than we have today. They each require a background process.
>> 
>> Let’s hear what you think, both about the interest level for this feature and any comments on the designs. I may take this one over to the FDB forums as well for feedback. Cheers,
>> 
>> Adam


Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Ilya Khlopotov <ii...@apache.org>.
Hi, 

Both proposals are fine but need a consumer process. Which is a tricky requirement because it will lead to problems in cases when queue grows faster than we can consume it. This realization got me thinking about finding possible ways to eliminate the need for a consumer.

I wouldn't spell out the final solution right away since I want to demonstrate the thinking process so others could build better proposals on top of it. 

Essentially, we need to de-duplicate events. In order to do that we need to know when given database was updated last time. We could maintain database level Sequence number and store global changes feed in the following form:
   UpdateSequence = (DbName, EventType, PreviousUpdateSequence)

Then every 10th (or 100th or 1000th) transaction can trigger a compaction process for updated database. It would use PreviousUpdateSequence to get pointer to get its parent, read pointer to grandparent, cleanup parent, and so on so force until we wouldn't have anything to clean up.

This is a terrible idea for the following reasons:
- Including UpdateSequence is expensive since we would need to add one more read to every update transaction
- recursion to do cleanup is expensive and most likely would need to be done in multiple transactions

What if FDB would support a list type for a value and would have an atomic operation to add the value to the list if it is missing. In this case we could store the data we need as follows (under separate subspace TBD).
VersionStamp = (DbName, EventType)
DbName = [versionstamps]

In this case in order to de-duplicate events, we would do the following:
- every once in a while (every 10th (or 100th or 1000th) update transaction (we would use PRNG ) to specific database) would execute compaction algorithm 
- Read list of versionstamps for older updates and issue remove operations for every version stamp except the biggest one
- update history value to include only biggest versionstamp

The question is how we would implement atomic addition of a value to a list. There is an IBLT data structure (https://arxiv.org/pdf/1101.2245.pdf) which can help us to achieve that. IBLT consists of the multiple cells where every cell has the following fields:
- count
- keySum
- valueSum
- hashkeySum

The beauty of this structure is that all fields are updated using blind addition operations while supporting enumeration of all key-values stored in the structure (with configurable probability). Which is available in FDB (aka atomic addition).

For our specific case it doesn't look like we need valueSum (because we only need keys) and hashkeySum (because we wouldn't have duplicates), so we can simplify the structure.

Best regards,
iilyak
   

On 2019/03/20 22:47:42, Adam Kocoloski <ko...@apache.org> wrote: 
> Hi all,
> 
> Most of the discussions so far have focused on the core features that are fundamental to CouchDB: JSON documents, revision tracking, _changes. I thought I’d start a thread on something a bit different: the _db_updates feed.
> 
> The _db_updates feed is an API that enables users to discover database lifecycle events across an entire CouchDB instance. It’s primarily useful in deployments that have lots and lots of databases, where it’s impractical to keep connections open for every database, and where database creations and deletions may be an automated aspect of the application’s use of CouchDB.
> 
> There are really two topics for discussion here. The first is: do we need to keep it? The primary driver of applications creating lots of DBs is the per-DB granularity of access controls; if we go down the route of implementing the document-level _access proposal perhaps users naturally migrate away from this DB-per-user data model. I’d be curious to hear points of view there.
> 
> I’ll assume for now that we do want to keep it, and offer some thoughts on how to implement it. The main challenge with _db_updates is managing the write contention; in write-heavy databases you have a lot of producers trying to tag that particular database as “updated", but all the consumer really cares about is getting a single “dbname”:”updated” event as needed. In the current architecture we try to dedupe a lot of the events in-memory before updating a regular CouchDB database with this information, but this leaves us exposed to possibly dropping events within a few second window.
> 
> ## Option 1: Queue + Compaction
> 
> One way to tackle this in FoundationDB is to have an intermediate subspace reserved as a queue. Each transaction that modifies a database would insert a versionstamped KV into the queue like
> 
> Versionstamp = (DbName, EventType)
> 
> Versionstamps are monotonically increasing and inserting versionstamped keys is a conflict-free operation. We’d have a consumer of this queue which is responsible for “log compaction”; i.e., the consumer would do range reads on the queue subspace, toss out duplicate contiguous “dbname”:“updated” events, and update a second index which would look more like the _changes feed.
> 
> ### Scaling Consumers
> 
> A single consumer can likely process 10k events/sec or more, but eventually we’ll need to scale. Borrowing from systems like Kafka the typical way to do this is to divide the queue into partitions and have individual consumers mapped to each partition. A partition in this model would just be a prefix on the Versionstamp:
> 
> (PartitionID, Versionstamp) = (DbName, EventType)
> 
> Our consumers will be more efficient and less likely to conflict with one another on updating the _db_updates index if messages are keyed to a partition based on DbName, although this still runs the risk that a couple of high-throughput databases could swamp a partition.
> 
> I’m not sure about the best path forward for handling that scenario. One could implement a rate-limiter that starts sloughing off additional messages for high-throughput databases (which has some careful edge cases), split the messages for a single database across multiple partitions, rely on operators to blacklist certain databases from the _db_updates system, etc. Each has downsides.
> 
> ## Option 2: Atomic Ops + Consumer
> 
> In this approach we still have an intermediate subspace, and a consumer of that subspace which updates the _db_updates index. But this time, we have at most one KV per database in the subspace, with an atomic counter for a value. When a document is updated it bumps the counter for its database in that subspace. So we’ll have entries like
> 
> (“counters”, “db1235”) = 1
> (“counters”, “db0001”) = 42
> (“counters”, “telemetry-db”) = 12312
> 
> and so on. Like versionstamps, atomic operations are conflict-free so we need not worry about introducing spurious conflicts on high-throughput databases.
> 
> The initial pass of the consumer logic would go something like this:
> 
> - Do a snapshot range read of the “counters” subspace (or whatever we call it)
> - Record the current values for all counters in a separate summary KV (you’ll see why in a minute)
> - Do a limit=1 range read on the _changes space for each DB in the list to grab the latest Sequence
> - Update the _db_updates index with the latest Sequence for each of these databases
> 
> On a second pass, the consumer would read the summary KV from the last pass and compare the previous counters with the current values. If any counters have not been updated in the interval, the consumer would try to clear those from the “counters” subspace (adding them as explicit conflict keys to ensure we don’t miss a concurrent update). It would then proceed with the rest of the logic from the initial pass. This is a careful balancing act:
> 
> - We don’t want to pollute the “counters” subspace with idle databases because each entry requires an extra read of _changes
> - We don’t want to attempt to clear counters that are constantly updated because that’s going to fail with a conflict every time
> 
> The scalability axis here is the number of databases updated within any short window of time (~1 second or less). If we end up with that number growing large we can have consumers responsible for range of the “counters” subspace, though I think that’s less likely than in the queue-based design.
> 
> I don’t know in detail what optimizations FoundationDB applies to atomic operations (e.g. coalescing them at a layer above the storage engine). That’s worth checking into, as otherwise I’d be concerned about introducing super-hot keys here.
> 
> This option does not handle the “created” and “deleted” lifecycle events for each database, but those are really quite simple and could really be inserted directly into the _db_updates index.
> 
> ===
> 
> There are some additional details which can be fleshed out in an RFC, but this is the basic gist of things. Both designs would be more robust at capturing every single updated database (because the enqueue/increment operation would be part of the document update transaction). They would allow for a small delay between the document update and the appearance of the database in _db_updates, which is no different than we have today. They each require a background process.
> 
> Let’s hear what you think, both about the interest level for this feature and any comments on the designs. I may take this one over to the FDB forums as well for feedback. Cheers,
> 
> Adam

Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Adam Kocoloski <ko...@apache.org>.
> On Mar 25, 2019, at 12:48 PM, Mike Rhodes <co...@dx13.co.uk> wrote:
> 
> On Wed, 20 Mar 2019, at 22:47, Adam Kocoloski wrote:
>> 
>> ## Option 1: Queue + Compaction
>> 
>> One way to tackle this in FoundationDB is to have an intermediate 
>> subspace reserved as a queue. Each transaction that modifies a database 
>> would insert a versionstamped KV into the queue like
>> 
>> Versionstamp = (DbName, EventType)
>> 
>> Versionstamps are monotonically increasing and inserting versionstamped 
>> keys is a conflict-free operation. We’d have a consumer of this queue 
>> which is responsible for “log compaction”; i.e., the consumer would do 
>> range reads on the queue subspace, toss out duplicate contiguous 
>> “dbname”:“updated” events, and update a second index which would look 
>> more like the _changes feed.
> 
> I couldn't immediately see how we cleared out older entries from this potentially very large queue. For example, the worker processing the queue to deduplicate might issue range deletes after processing each "batch". Is this simple enough to do?
> 
> Mike.

Yes, that’s the (implicit) idea. Simple to implement, not clear to me how well the storage servers can handle the load. I think the “range clears are cheap” statement largely refers to the transaction management system.

Adam

Re: [DISCUSS] _db_updates feed in FoundationDB

Posted by Mike Rhodes <co...@dx13.co.uk>.
On Wed, 20 Mar 2019, at 22:47, Adam Kocoloski wrote:
> 
> ## Option 1: Queue + Compaction
> 
> One way to tackle this in FoundationDB is to have an intermediate 
> subspace reserved as a queue. Each transaction that modifies a database 
> would insert a versionstamped KV into the queue like
> 
> Versionstamp = (DbName, EventType)
> 
> Versionstamps are monotonically increasing and inserting versionstamped 
> keys is a conflict-free operation. We’d have a consumer of this queue 
> which is responsible for “log compaction”; i.e., the consumer would do 
> range reads on the queue subspace, toss out duplicate contiguous 
> “dbname”:“updated” events, and update a second index which would look 
> more like the _changes feed.

I couldn't immediately see how we cleared out older entries from this potentially very large queue. For example, the worker processing the queue to deduplicate might issue range deletes after processing each "batch". Is this simple enough to do?

Mike.