You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@couchdb.apache.org by GitBox <gi...@apache.org> on 2021/04/06 22:12:51 UTC

[GitHub] [couchdb-documentation] kocolosk opened a new pull request #651: Add RFC on sharded changes

kocolosk opened a new pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651


   Opening a PR for discussion on this RFC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] ricellis commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
ricellis commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r609576045



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.
+
+## Backwards Compatibility
+
+The existing `_changes` endpoint will continue to function. We will implement
+a scatter/gather coordinator following the same logic that we used for views in
+"classic" CouchDB. Note that sequence entries are totally-ordered and unique
+across all shards, so we can reassemble a single ordered list of updates as if
+we were dealing with a single subspace the entire time.
+
+# Advantages and Disadvantages
+
+Advantages
+- Reduced write hotspots in FoundationDB
+- Linearly scalable indexing throughput
+- Linearly scalable _changes feed consumption
+
+Disadvantages
+- Introduction of a new per-database tunable parameter
+- No retroactive improvement in _changes throughput for the sequence range prior
+  to the reshard event (e.g., a new index added to the database will start with
+  the parallelism defined at DB creation time)
+ 
+# Key Changes
+
+Users would be able to modify the shard count of the changes feed up and down to
+have some control over resources devoted to background index maintenance. While
+backwards compatbility with the existing `_changes` API would be maintained, a
+new API would directly expose the shard-level feeds for easier, more efficient
+parallel consumption.
+
+## Applications and Modules affected
+
+`fabric2_fdb:write_doc/6` currently contains the logic that chooses where to
+write the sequence index entries.
+
+`fabric2_db:fold_changes/5` is the code that currently consumes the changes from
+the `?DB_CHANGES` subspace. We might repurpose this to be the code that reads
+from a single shard. The `fold_changes/5` code is only used in two locations:
+
+- `chttpd_changes:send_changes/3`, i.e. the external API
+- `couch_views_indexer:fold_changes/2`, i.e. the indexing subsystem
+
+Additionally, we have `fabric2_fdb:get_last_change/1` that would need to be
+modified to take the highest sequence across all current shards of the database.
+
+We would likely have a new `fabric2_changes` module to collect the logic for
+discovering endpoints, scatter/gather merging of shard feeds, resharding
+invocations, etc.
+
+## HTTP API additions
+
+Happy to take suggestions on what color to paint the shed, but I imagine
+something like
+
+`GET /db/_changes/<ShardID>`
+
+will provide the change feed for a given shard using JSONL for all responses,

Review comment:
       AFAICT it [still doesn't have an official mime-type though](https://mailarchive.ietf.org/arch/msg/json/dWMWD0JDa2HiUYjWjLjrQExeIx4/).
   If it is going to be this format please don't (ab)use the `application/json` type in the way the existing continuous feed does - tooling that automatically tries to handle responses based on mime-type break when they receive JSONL with the JSON mime-type.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] rnewson commented on pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
rnewson commented on pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#issuecomment-879263304


   It sounds like we've not exhausted other options other than re-adding sharding just yet. Mike, your point on multi-doc updates is a good one, and perhaps that needs discussing before we commit to this path.
   
   The essential problem with multi-doc transactions is replication. Specifically, it would be a poor (or, at least, surprising) implementation of multi-doc transactions if those updates are not applied atomically when replicated, though it would be by far the easiest way to do it. Any form of replication enhancement to support this seems to have a significant scalability penalty. Not everybody would use multi-doc transactions, and those that would wouldn't necessarily do it for _all_ updates, but building something we know will work poorly if used 'too much' is not a good idea (and particularly this thing, multi-doc txn, which we've explicitly prohibited for scalability reasons this whole time).
   
   A future multi-doc txn option would complicate the work complicated in this RFC but doesn't seem to break it (though it diminishes its performance characteristics, all the way to zero in some cases).
   
   Setting multi-doc txn aside, what are the limits of the current (relatively simple) changes implementation? How fast can we read the changes feed today? Can indexing read the existing changes feed from multiple places in parallel at runtime and still gain a performance boost?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@couchdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] ricellis commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
ricellis commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r609714571



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive

Review comment:
       Taken in conjunction with the previous statement
   >all future updates to that database will be routed to those new subspaces
   
   Does this mean that a consumer, wishing to process the entire changes feed (i.e from `0`) on a database that had been resharded, would need to iterate over each shard of each tombstoned space until reaching the notification and process all the current shards as well?

##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.
+
+## Backwards Compatibility
+
+The existing `_changes` endpoint will continue to function. We will implement
+a scatter/gather coordinator following the same logic that we used for views in
+"classic" CouchDB. Note that sequence entries are totally-ordered and unique
+across all shards, so we can reassemble a single ordered list of updates as if
+we were dealing with a single subspace the entire time.
+
+# Advantages and Disadvantages
+
+Advantages
+- Reduced write hotspots in FoundationDB
+- Linearly scalable indexing throughput
+- Linearly scalable _changes feed consumption
+
+Disadvantages
+- Introduction of a new per-database tunable parameter
+- No retroactive improvement in _changes throughput for the sequence range prior
+  to the reshard event (e.g., a new index added to the database will start with
+  the parallelism defined at DB creation time)
+ 
+# Key Changes
+
+Users would be able to modify the shard count of the changes feed up and down to
+have some control over resources devoted to background index maintenance. While
+backwards compatbility with the existing `_changes` API would be maintained, a
+new API would directly expose the shard-level feeds for easier, more efficient
+parallel consumption.
+
+## Applications and Modules affected
+
+`fabric2_fdb:write_doc/6` currently contains the logic that chooses where to
+write the sequence index entries.
+
+`fabric2_db:fold_changes/5` is the code that currently consumes the changes from
+the `?DB_CHANGES` subspace. We might repurpose this to be the code that reads
+from a single shard. The `fold_changes/5` code is only used in two locations:
+
+- `chttpd_changes:send_changes/3`, i.e. the external API
+- `couch_views_indexer:fold_changes/2`, i.e. the indexing subsystem
+
+Additionally, we have `fabric2_fdb:get_last_change/1` that would need to be
+modified to take the highest sequence across all current shards of the database.
+
+We would likely have a new `fabric2_changes` module to collect the logic for
+discovering endpoints, scatter/gather merging of shard feeds, resharding
+invocations, etc.
+
+## HTTP API additions
+
+Happy to take suggestions on what color to paint the shed, but I imagine
+something like
+
+`GET /db/_changes/<ShardID>`
+
+will provide the change feed for a given shard using JSONL for all responses,
+but otherwise matching the existing format of the changes feed, while
+
+`GET /db/_changes/_meta?since=N`
+
+can be used to retrieve the shard topology as of a particular sequence.

Review comment:
       So to go back to my earlier example if I was processing the feed since `0` on a database that had been resharded I might do:
   * `GET /db/_changes/_meta?since=0`
   * For each shard ID:
       * `GET /db/_changes/<ShardID>`
       * process changes
       * reach a notification that there was a resharding event
   
   Now how do I know what the new shard IDs are to continue processing the feed?
   It seems like I should call `GET /db/_changes/_meta?since=N`, but isn't the last seq I received still part of the original topology; so I don't have an `N` that will tell me about the new topology?
   Is the intention that the notification will tell me what the new `N` is? If so how does that work if there haven't been any new changes to the database yet, does the resharding itself create a sequence?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] rnewson commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
rnewson commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r608872787



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.

Review comment:
       thanks, that helps. I was thinking only of the scale down case, I also don't see how we could have more tasks than shards, that's the motivating case for re-introducing sharding.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] kocolosk commented on pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
kocolosk commented on pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#issuecomment-879446621


   Yeah, transactions in the changes feed are a veritable "can of worms" with or without sharding, so @mikerhodes I think you were correct to call that one out of scope for this discussion. The last time I thought hard about it I came to the same conclusion that you did, which is that we'd want to build a separate transaction log rather than trying to turn the changes feed into said log.
   
   @nickva I can see where the locality API would be a useful way to parallelize the initial index build while minimizing the load on FDB. Of course it doesn't help much with DBs that see a high write QPS in steady-state, since the core data model still concentrates all writes to the seq index in a single FDB shard. I'll freely admit that the write hot spot may not be the most urgent problem to solve; I think my rationale for the proposal was in part the idea of taking out a few birds with a single stone and providing a well-defined scale out story for other external `_changes` consumers.
   
   I don't think I have the time at the moment to help implement this, and won't be offended if folks want to close it. We can always revisit later.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@couchdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] nickva commented on pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
nickva commented on pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#issuecomment-881764357


   @kocolosk Good point, the locality trick would be useful internally to say process the changes feed for the indexing but wouldn't help with write hotspots. The design the _changes feed external API is pretty neat and I think it may be worth going that way eventually but perhaps with an auto-sharding set up so that users don't have to think about Q at all.
   
   Found a description of how FDB backup system avoids hot write shards https://forums.foundationdb.org/t/keyspace-partitions-performance/168/2. Apparently it's based on writing to `(hash(version/1e6), version)` key ranges, to have a balance between being able to query ranges but also avoid writing more than 1 second of data (by default versions advance at a rate of about 1e6 per second) to one particular shard at a time on average. Not sure yet if that's an idea we can borrow directly but perhaps there is something there...
   
   Regarding changes feed being a bottleneck for indexing, we did a quick and dirty test by reading 1M and 10M changes on a busy cluster (3 storage nodes) and we were able to get about 58-64k rows/sec with just an empty accumulator which counts rows.
   
   ```
   {ok, Db} = fabric2_db:open(<<"perf-test-user/put_insert_1626378013">>, []).
   Fun = fun(_Change, Acc) -> {ok, Acc + 1} end.
   
   (dbcore@127.0.0.1)6> timer:tc(fun() -> fabric2_db:fold_changes(Db, 0, Fun, 0, [{limit, 1000000}]) end).
   {16550135,{ok,1000000}}
   
   (dbcore@127.0.0.1)12> timer:tc(fun() -> fabric2_db:fold_changes(Db, 0, Fun, 0, [{limit, 10000000}]) end).
   {156290604,{ok,10000000}}
   ....
   ```
   
   For indexing at least, it seems that's not too bad. We'd want to probably find a way to parallelize doc fetches, and most of all concurrent index updates.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@couchdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] rnewson commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
rnewson commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r608543608



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.
+
+## Backwards Compatibility
+
+The existing `_changes` endpoint will continue to function. We will implement
+a scatter/gather coordinator following the same logic that we used for views in
+"classic" CouchDB. Note that sequence entries are totally-ordered and unique
+across all shards, so we can reassemble a single ordered list of updates as if
+we were dealing with a single subspace the entire time.
+
+# Advantages and Disadvantages
+
+Advantages
+- Reduced write hotspots in FoundationDB
+- Linearly scalable indexing throughput
+- Linearly scalable _changes feed consumption
+
+Disadvantages
+- Introduction of a new per-database tunable parameter
+- No retroactive improvement in _changes throughput for the sequence range prior
+  to the reshard event (e.g., a new index added to the database will start with
+  the parallelism defined at DB creation time)
+ 
+# Key Changes
+
+Users would be able to modify the shard count of the changes feed up and down to
+have some control over resources devoted to background index maintenance. While

Review comment:
       in some environments it will not be appropriate to give users this level of control, fairness must be designed into the view indexer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] kocolosk commented on pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
kocolosk commented on pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#issuecomment-815300877


   Thanks for the read Paul. The Intro was meant to convey the goals of the proposal, in priority order:
   
   > This proposal is designed to improve indexing throughput, reduce hot spots for write-intensive workloads, and offer a horizontally-scalable API for consumers to process the change capture feed for an individual database in CouchDB 4.0.
   
   That said, I totally spaced out on the `ebtree` data structure here; I was still thinking of the model where view KVs are stored as KVs directly in FDB. So yeah, this is spot-on:
   
   > We're still left with the fact that writes to the view ebtree aren't currently parallelizable. If we want to make view updates faster, the first thing I'd investigate is how to upgrade ebtree to allow update-in-place semantics with the MVCC semantics that FDB offers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] nickva commented on pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
nickva commented on pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#issuecomment-879250910


   Agree with @rnewson . Even if we switch the index storage format to allow paralelizable updates, adding a static Q would be a step back it seem.
   
   One issue is  at the user/API level. We'd bring back Q, which we didn't want to have to deal with now using FDB. And then in the code, we just removed sharding code in fabric, I am not too excited about bringing parts of it back, unless it's a last resort and nothing else works. We invent some auto-sharding of course, but that would be even more complexity.
   
   It seems we'd also want to separate a bit better change feed improvements vs indexing improvements. Could we speed up indexing without a static Q sharding of change feed with all the API changes involved and hand-written resharding code (epochs) and hard values?
   
   I think we can, if we invent a new index structure that allow paralelizable updates. Like say an inverted json index for Mango Queries based on https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20171020_inverted_indexes.md. 
   
   The idea I had was to use locality API to split the _changes feed into sub-sequences, and either start a separate couch_jobs job (or just processes under a single couch_job indexer) to fetch docs, process and write to the index in parallel. So, if the _changes sequence looks like `[10, 20, 25, 30]`, locality API might split them as `[10, 20]`, `[25, 30]`. Then two indexers would index those in parallel. In the meantime the doc at sequence 20, could be updated to and now be at sequence [35]. Then we'd catch up from 35 to up the next db sequence and so on. The benefit there would be to avoid managing a static Q at all. The downside is it would work only for a write-paralelizable index and would only work if we "hide" the index being built in the background from queries (as it would look quite odd with as it wouldn't built in changes feed order). Then, once it's built, if we can update the index transactionally, we'd get consistent reads on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@couchdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] mikerhodes commented on pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
mikerhodes commented on pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#issuecomment-879193040


   @rnewson I think that it's important to note that we are "only" partitioning the changes feed subspace -- not primary data nor indexes themselves. IIRC Indexes were the primary problem within the 3.0 scheme because of the ordered scatter-gather we needed to perform. I'm not convinced that the partitioning/sharding of the changes index space is something that is a bad thing, at least with regards to some of the performance problems that shards caused in 3.0.
   
   I'd be curious what @kocolosk thinks about whether partitioning the changes feed brings back the bad stuff from CouchDB 3.0?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@couchdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] rnewson commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
rnewson commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r608541917



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.

Review comment:
       How do we account for fairness here? A index of 1000 shards would appear to get to do more work than an index of 1 shard. I can't quite tell if we're free to commit to each shard independently. I _think_ so, as long as we don't execute queries until they've all been "updated to the same final snapshot". If I'm reading that right, we would be free to vary the number of shard index jobs at runtime?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] rnewson commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
rnewson commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r608873182



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.
+
+## Backwards Compatibility
+
+The existing `_changes` endpoint will continue to function. We will implement
+a scatter/gather coordinator following the same logic that we used for views in
+"classic" CouchDB. Note that sequence entries are totally-ordered and unique
+across all shards, so we can reassemble a single ordered list of updates as if
+we were dealing with a single subspace the entire time.
+
+# Advantages and Disadvantages
+
+Advantages
+- Reduced write hotspots in FoundationDB
+- Linearly scalable indexing throughput
+- Linearly scalable _changes feed consumption
+
+Disadvantages
+- Introduction of a new per-database tunable parameter
+- No retroactive improvement in _changes throughput for the sequence range prior
+  to the reshard event (e.g., a new index added to the database will start with
+  the parallelism defined at DB creation time)
+ 
+# Key Changes
+
+Users would be able to modify the shard count of the changes feed up and down to
+have some control over resources devoted to background index maintenance. While
+backwards compatbility with the existing `_changes` API would be maintained, a
+new API would directly expose the shard-level feeds for easier, more efficient
+parallel consumption.
+
+## Applications and Modules affected
+
+`fabric2_fdb:write_doc/6` currently contains the logic that chooses where to
+write the sequence index entries.
+
+`fabric2_db:fold_changes/5` is the code that currently consumes the changes from
+the `?DB_CHANGES` subspace. We might repurpose this to be the code that reads
+from a single shard. The `fold_changes/5` code is only used in two locations:
+
+- `chttpd_changes:send_changes/3`, i.e. the external API
+- `couch_views_indexer:fold_changes/2`, i.e. the indexing subsystem
+
+Additionally, we have `fabric2_fdb:get_last_change/1` that would need to be
+modified to take the highest sequence across all current shards of the database.
+
+We would likely have a new `fabric2_changes` module to collect the logic for
+discovering endpoints, scatter/gather merging of shard feeds, resharding
+invocations, etc.
+
+## HTTP API additions
+
+Happy to take suggestions on what color to paint the shed, but I imagine
+something like
+
+`GET /db/_changes/<ShardID>`
+
+will provide the change feed for a given shard using JSONL for all responses,

Review comment:
       and we don't even get a mention




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] kocolosk commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
kocolosk commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r609929193



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive

Review comment:
       Yes that's correct




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] rnewson commented on pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
rnewson commented on pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#issuecomment-879175344


   This seems to have been dropped, can someone confirm? I stand by my comment above fwiw, that adding sharding after all the effort to switch to FDB to get away from it is regrettable. I could only countenance this in couchdb 4.0 if there is no alternative and it is demonstrated to be essential.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@couchdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] nickva edited a comment on pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
nickva edited a comment on pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#issuecomment-879250910


   Agree with @rnewson . Even if we switch the index storage format to allow paralelizable updates, adding a static Q would be a step back it seems.
   
   One issue is  at the user/API level. We'd bring back Q, which we didn't want to have to deal with now using FDB. And then in the code, we just removed sharding code in fabric, I am not too excited about bringing parts of it back, unless it's a last resort and nothing else works. We invent some auto-sharding of course, but that would be even more complexity.
   
   It seems we'd also want to separate a bit better change feed improvements vs indexing improvements. Could we speed up indexing without a static Q sharding of change feed with all the API changes involved and hand-written resharding code (epochs) and hard values?
   
   I think we can, if we invent a new index structure that allow paralelizable updates. Like say an inverted json index for Mango Queries based on https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20171020_inverted_indexes.md. 
   
   The idea I had was to use locality API to split the _changes feed into sub-sequences, and either start a separate couch_jobs job (or just processes under a single couch_job indexer) to fetch docs, process and write to the index in parallel. So, if the _changes sequence looks like `[10, 20, 25, 30]`, locality API might split them as `[10, 20]`, `[25, 30]`. Then two indexers would index those in parallel. In the meantime the doc at sequence 20, could be updated to and now be at sequence [35]. Then we'd catch up from 35 to up the next db sequence and so on. The benefit there would be to avoid managing a static Q at all. The downside is it would work only for a write-paralelizable index and would only work if we "hide" the index being built in the background from queries (as it would look quite odd with as it wouldn't built in changes feed order). Then, once it's built, if we can update the index transactionally, we'd get consistent reads on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@couchdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] rnewson commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
rnewson commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r608542958



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.
+
+## Backwards Compatibility
+
+The existing `_changes` endpoint will continue to function. We will implement
+a scatter/gather coordinator following the same logic that we used for views in
+"classic" CouchDB. Note that sequence entries are totally-ordered and unique
+across all shards, so we can reassemble a single ordered list of updates as if
+we were dealing with a single subspace the entire time.
+
+# Advantages and Disadvantages
+
+Advantages
+- Reduced write hotspots in FoundationDB
+- Linearly scalable indexing throughput
+- Linearly scalable _changes feed consumption
+
+Disadvantages
+- Introduction of a new per-database tunable parameter

Review comment:
       this is an understatement. the move to fdb was hoped to remove all the notions of `q` (and scatter-gather for that matter) that have proved so problematic. The proposal does contain enough details on how this new `q` can vary over time, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] kocolosk commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
kocolosk commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r608731337



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.

Review comment:
       Yes, with the default hashing scheme each indexer task can commit to the same secondary index independently since they are operating on disjoint sets of documents. If, on the other hand, you had a hashing scheme where you just sprayed document updates randomly across shards you might need to add some extra bookkeeping to avoid cases where two tasks are concurrently operating on different versions of the same document (and maybe committing updates in the wrong order, i.e. overwriting index entries from a newer version with an older one).
   
   A resharding event would need to act as a barrier here; the indexer job would need to allow all its tasks to finish on the old shard map and then start a new job with the new map.
   
   I suppose if you wanted to run fewer indexing tasks than the number of shards you could certainly do that and have each task work on one or more shards in series without a ton of extra complexity. Running _more_ tasks than the number of shards is a different proposal. Maybe possible, but wasn't my focus here. Others might be able to comment on why we haven't done that already.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] nickva edited a comment on pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
nickva edited a comment on pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#issuecomment-879250910


   Agree with @rnewson . Even if we switch the index storage format to allow paralelizable updates, adding a static Q would be a step back it seems.
   
   One issue is  at the user/API level. We'd bring back Q, which we didn't want to have to deal any more with FDB as a backend. And then in the code, we just removed sharding code in fabric, I am not too excited about bringing parts of it back, unless it's a last resort and nothing else works. We invent some auto-sharding of course, but that would be even more complexity.
   
   It seems we'd also want to separate a bit better changes feed improvements vs indexing improvements. Could we speed up indexing without a static Q sharding of change feed with all the API changes involved and hand-written resharding code (epochs) and hard values?
   
   I think we can, if we invent a new index structure that allow paralelizable updates. Like say an inverted json index for Mango Queries based on https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20171020_inverted_indexes.md. 
   
   The idea I had was to use locality API to split the _changes feed into sub-sequences, and either start a separate couch_jobs job (or just processes under a single couch_job indexer) to fetch docs, process and write to the index in parallel. So, if the _changes sequence looks like `[10, 20, 25, 30]`, locality API might split them as `[10, 20]`, `[25, 30]`. Then two indexers would index those in parallel. In the meantime the doc at sequence 20, could be updated to and now be at sequence [35]. Then we'd catch up from 35 to up the next db sequence and so on. The benefit there would be to avoid managing a static Q at all. The downside is it would work only for a write-paralelizable index and would only work if we "hide" the index being built in the background from queries (as it would look quite odd with as it wouldn't built in changes feed order). Then, once it's built, if we can update the index transactionally, we'd get consistent reads on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@couchdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] rnewson commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
rnewson commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r608544529



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.
+
+## Backwards Compatibility
+
+The existing `_changes` endpoint will continue to function. We will implement
+a scatter/gather coordinator following the same logic that we used for views in
+"classic" CouchDB. Note that sequence entries are totally-ordered and unique
+across all shards, so we can reassemble a single ordered list of updates as if
+we were dealing with a single subspace the entire time.
+
+# Advantages and Disadvantages
+
+Advantages
+- Reduced write hotspots in FoundationDB
+- Linearly scalable indexing throughput
+- Linearly scalable _changes feed consumption
+
+Disadvantages
+- Introduction of a new per-database tunable parameter
+- No retroactive improvement in _changes throughput for the sequence range prior
+  to the reshard event (e.g., a new index added to the database will start with
+  the parallelism defined at DB creation time)
+ 
+# Key Changes
+
+Users would be able to modify the shard count of the changes feed up and down to
+have some control over resources devoted to background index maintenance. While
+backwards compatbility with the existing `_changes` API would be maintained, a
+new API would directly expose the shard-level feeds for easier, more efficient
+parallel consumption.
+
+## Applications and Modules affected
+
+`fabric2_fdb:write_doc/6` currently contains the logic that chooses where to
+write the sequence index entries.
+
+`fabric2_db:fold_changes/5` is the code that currently consumes the changes from
+the `?DB_CHANGES` subspace. We might repurpose this to be the code that reads
+from a single shard. The `fold_changes/5` code is only used in two locations:
+
+- `chttpd_changes:send_changes/3`, i.e. the external API
+- `couch_views_indexer:fold_changes/2`, i.e. the indexing subsystem
+
+Additionally, we have `fabric2_fdb:get_last_change/1` that would need to be
+modified to take the highest sequence across all current shards of the database.
+
+We would likely have a new `fabric2_changes` module to collect the logic for
+discovering endpoints, scatter/gather merging of shard feeds, resharding
+invocations, etc.
+
+## HTTP API additions
+
+Happy to take suggestions on what color to paint the shed, but I imagine
+something like
+
+`GET /db/_changes/<ShardID>`
+
+will provide the change feed for a given shard using JSONL for all responses,

Review comment:
       `JSONL`! it finally has a name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] kocolosk commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
kocolosk commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r608733164



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.
+
+## Backwards Compatibility
+
+The existing `_changes` endpoint will continue to function. We will implement
+a scatter/gather coordinator following the same logic that we used for views in
+"classic" CouchDB. Note that sequence entries are totally-ordered and unique
+across all shards, so we can reassemble a single ordered list of updates as if
+we were dealing with a single subspace the entire time.
+
+# Advantages and Disadvantages
+
+Advantages
+- Reduced write hotspots in FoundationDB
+- Linearly scalable indexing throughput
+- Linearly scalable _changes feed consumption
+
+Disadvantages
+- Introduction of a new per-database tunable parameter
+- No retroactive improvement in _changes throughput for the sequence range prior
+  to the reshard event (e.g., a new index added to the database will start with
+  the parallelism defined at DB creation time)
+ 
+# Key Changes
+
+Users would be able to modify the shard count of the changes feed up and down to
+have some control over resources devoted to background index maintenance. While
+backwards compatbility with the existing `_changes` API would be maintained, a
+new API would directly expose the shard-level feeds for easier, more efficient
+parallel consumption.
+
+## Applications and Modules affected
+
+`fabric2_fdb:write_doc/6` currently contains the logic that chooses where to
+write the sequence index entries.
+
+`fabric2_db:fold_changes/5` is the code that currently consumes the changes from
+the `?DB_CHANGES` subspace. We might repurpose this to be the code that reads
+from a single shard. The `fold_changes/5` code is only used in two locations:
+
+- `chttpd_changes:send_changes/3`, i.e. the external API
+- `couch_views_indexer:fold_changes/2`, i.e. the indexing subsystem
+
+Additionally, we have `fabric2_fdb:get_last_change/1` that would need to be
+modified to take the highest sequence across all current shards of the database.
+
+We would likely have a new `fabric2_changes` module to collect the logic for
+discovering endpoints, scatter/gather merging of shard feeds, resharding
+invocations, etc.
+
+## HTTP API additions
+
+Happy to take suggestions on what color to paint the shed, but I imagine
+something like
+
+`GET /db/_changes/<ShardID>`
+
+will provide the change feed for a given shard using JSONL for all responses,

Review comment:
       Yep! https://jsonlines.org/on_the_web/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [couchdb-documentation] kocolosk commented on a change in pull request #651: Add RFC on sharded changes

Posted by GitBox <gi...@apache.org>.
kocolosk commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r609931796



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.
+
+## Backwards Compatibility
+
+The existing `_changes` endpoint will continue to function. We will implement
+a scatter/gather coordinator following the same logic that we used for views in
+"classic" CouchDB. Note that sequence entries are totally-ordered and unique
+across all shards, so we can reassemble a single ordered list of updates as if
+we were dealing with a single subspace the entire time.
+
+# Advantages and Disadvantages
+
+Advantages
+- Reduced write hotspots in FoundationDB
+- Linearly scalable indexing throughput
+- Linearly scalable _changes feed consumption
+
+Disadvantages
+- Introduction of a new per-database tunable parameter
+- No retroactive improvement in _changes throughput for the sequence range prior
+  to the reshard event (e.g., a new index added to the database will start with
+  the parallelism defined at DB creation time)
+ 
+# Key Changes
+
+Users would be able to modify the shard count of the changes feed up and down to
+have some control over resources devoted to background index maintenance. While
+backwards compatbility with the existing `_changes` API would be maintained, a
+new API would directly expose the shard-level feeds for easier, more efficient
+parallel consumption.
+
+## Applications and Modules affected
+
+`fabric2_fdb:write_doc/6` currently contains the logic that chooses where to
+write the sequence index entries.
+
+`fabric2_db:fold_changes/5` is the code that currently consumes the changes from
+the `?DB_CHANGES` subspace. We might repurpose this to be the code that reads
+from a single shard. The `fold_changes/5` code is only used in two locations:
+
+- `chttpd_changes:send_changes/3`, i.e. the external API
+- `couch_views_indexer:fold_changes/2`, i.e. the indexing subsystem
+
+Additionally, we have `fabric2_fdb:get_last_change/1` that would need to be
+modified to take the highest sequence across all current shards of the database.
+
+We would likely have a new `fabric2_changes` module to collect the logic for
+discovering endpoints, scatter/gather merging of shard feeds, resharding
+invocations, etc.
+
+## HTTP API additions
+
+Happy to take suggestions on what color to paint the shed, but I imagine
+something like
+
+`GET /db/_changes/<ShardID>`
+
+will provide the change feed for a given shard using JSONL for all responses,
+but otherwise matching the existing format of the changes feed, while
+
+`GET /db/_changes/_meta?since=N`
+
+can be used to retrieve the shard topology as of a particular sequence.

Review comment:
       Yes, I realize I didn't fully specify that bit. The resharding does create its own sequence, and a call to the `_meta` endpoint with that sequence would return the new topology. I suppose the JSONL feed would have a final line indicating the presence of the tombstone at that sequence.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org