You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@couchdb.apache.org by GitBox <gi...@apache.org> on 2021/04/07 14:54:44 UTC

[GitHub] [couchdb-documentation] kocolosk commented on a change in pull request #651: Add RFC on sharded changes

kocolosk commented on a change in pull request #651:
URL: https://github.com/apache/couchdb-documentation/pull/651#discussion_r608731337



##########
File path: rfcs/018-sharded-changes-feeds.md
##########
@@ -0,0 +1,238 @@
+---
+name: Formal RFC
+about: Submit a formal Request For Comments for consideration by the team.
+title: 'High Throughput Parallel _changes Feed'
+labels: rfc, discussion
+assignees: ''
+
+---
+
+# Introduction
+
+This proposal is designed to improve indexing throughput, reduce hot spots for
+write-intensive workloads, and offer a horizontally-scalable API for consumers
+to process the change capture feed for an individual database in CouchDB 4.0.
+
+## Abstract
+
+The current implementation on `main` writes all changes feed entries for a given
+database into a single `?DB_CHANGES` subspace in FoundationDB. The view indexing
+system (c.f. [RFC 008](008-map-indexes.md#index-building)) uses a single worker
+for each design document that processes all the entries for that changes feed.
+High throughput writers can overwhelm that indexer and ensure that it will never
+bring the view up-to-date. The previous RFC mentions parallelizing the build as
+a future optimization. Well, here we are.
+
+The parallelization technique proposed herein shards the changes feed itself
+into multiple subspaces. This reduces the write load on any single underlying
+FoundationDB storage server. We also introduce a new external API for accessing
+these individual shards directly to ensure that consumers can scale out to keep
+up with write-intensive workloads without needing to build their own system to
+farm out changes from a single feed to multiple workers.
+
+Shard counts on a database can vary over time as needed, but previous entries
+are not re-sharded. We sketch how an indexer can process the individual sharded
+feeds in parallel without sacrificing the isolation semantics of the secondary
+index (i.e., that it observes the state of the underlying database as it existed
+as some specific sequence). Sequence numbers are globally unique and totally
+ordered across shards.
+
+## Requirements Language
+
+The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
+"SHOULD", "SHOULD NOT", "RECOMMENDED",  "MAY", and "OPTIONAL" in this
+document are to be interpreted as described in
+[RFC 2119](https://www.rfc-editor.org/rfc/rfc2119.txt).
+
+## Terminology
+
+**changes shard**: a subspace in FoundationDB into which some portion of the
+changes feed entries for that database are written. It is not directly related
+to the underlying storage server shards in FoundationDB itself.
+
+---
+
+# Detailed Description
+
+## Data Model
+
+The existing `?DB_CHANGES` subspace will be deprecated (i.e. renamed as
+`?DB_CHANGES_DEPRECATED`) and a new `?DB_CHANGES` subspace will be created. This
+subspace will contain an additional nested level with the individual shard
+identifiers. Within each shard the data model is unchanged from before.
+
+## Routing
+
+Documents will be routed to shards using a configurable hashing scheme. The
+default scheme will use consistent hashing on the partition key, so that a) all
+updates to a given document will land in the same shard, and b) documents from
+the same partition in a partitioned database will also be colocated. This
+simplifies matters for a consumer processing the individual shard feeds in
+parallel, as it can ignore the possibility of observing out-of-order updates to
+the same document from different shards, and it furthermore allows the
+computation of per-partition statistics (e.g. windowing functions over meter
+readings in the canonical IoT device use case for partitions).
+
+## Resharding
+
+The shard count for a database can change over time. When the shard count
+changes, a new set of `ShardIds` in the `?DB_CHANGES` subspace is created, and
+all future updates to that database will be routed to those new subspaces.
+Consumers of the shard-level API will receive a notification that a resharding
+event has occurred once they reach the end of the updates committed to the
+previous subspace. They MUST re-connect to the new endpoints once they receive
+that notification in order to receive any additional updates.
+
+## Metadata
+
+We will extend the `?DB_CONFIG` subspace to add new information about the
+changes shards in a new `?CHANGES_SHARDS` nested subspace. This metadata will
+include the first sequence at which the new shard topology is active, the ID of
+the hashing scheme being used for that shard map, and a list of the associated
+`ShardIds`. For example, a newly-created DB will have the following entry
+indicating it only has a single shard:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID]}`
+
+Increasing the shard count to 4 at Sequence 5678 will cause the following entry
+to be added:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 5678} = {DefaultHashingScheme, [ShardID1, ShardID2, ShardID3, ShardID4]}`
+
+Resharding should also update the previous `?CHANGES_SHARDS` entry with a
+flag as a tombstone for this shard map:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {DefaultHashingScheme, [ShardID], Tombstone}`
+
+As mentioned previously, `ShardID` values are always unique and never reused.
+
+### Backwards Compatibility
+
+Existing databases will receive an entry in this subspace formatted like
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, 0} = {?DB_CHANGES_DEPRECATED}`
+
+and then a new one immediately thereafter indicating that new entries will land in a new subspace:
+
+`{?DB_CONFIG, ?CHANGES_SHARDS, CurrentSeq} = {DefaultHashingScheme, [ShardID]}`
+
+## Write Path
+
+Writers that are updating a particular document need to remove the previous
+entry for that document. The metadata that we maintain above is sufficient to
+calculate a ShardID given a partition key and a sequence, so we do not need to
+store the ShardID of the previous update directly in the document metadata.
+
+Once the previous entry is found and removed, the writer publishes the new
+update into the appropriate shard given the current shard map.
+
+Writers MUST NOT commit updates to a ShardID that has been replaced as part of a
+resharding event. This can be avoided by ensuring that the current
+`?CHANGES_SHARDS` entry is included in the read conflict set for the
+transaction, so that if a resharding event takes place underneath it the current
+write transaction will fail (because of the tombstone commit).
+
+## Read Path
+
+Readers who are connected directly to the shard-level changes feed will retrieve
+the shard topology for the database as of the `since` sequence from which they
+want to start. This retrieval will need to include the possibility that the
+changes exist in the deprecated subspace.
+
+## Indexers
+
+Updating a view group should be thought of a single "job" comprised of a set of
+"tasks" that are executed in parallel, one for each shard. Some coordination is
+required at the beginning and the end of the job: all tasks within the job
+should start from the same snapshot of the underlying database, and when they
+complete they should also have observed the same snapshot of the underlying
+database. If tasks need to acquire new snapshots along the way because of the
+large number of updates they need to process they can do so without
+coordination, but in the end the parent job MUST ensure that all tasks have
+updated to the same final snapshot.

Review comment:
       Yes, with the default hashing scheme each indexer task can commit to the same secondary index independently since they are operating on disjoint sets of documents. If, on the other hand, you had a hashing scheme where you just sprayed document updates randomly across shards you might need to add some extra bookkeeping to avoid cases where two tasks are concurrently operating on different versions of the same document (and maybe committing updates in the wrong order, i.e. overwriting index entries from a newer version with an older one).
   
   A resharding event would need to act as a barrier here; the indexer job would need to allow all its tasks to finish on the old shard map and then start a new job with the new map.
   
   I suppose if you wanted to run fewer indexing tasks than the number of shards you could certainly do that and have each task work on one or more shards in series without a ton of extra complexity. Running _more_ tasks than the number of shards is a different proposal. Maybe possible, but wasn't my focus here. Others might be able to comment on why we haven't done that already.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org