You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/12 16:05:30 UTC
[couchdb] branch 63012-scheduler updated: [fixup] Add
developer-oriented README.md
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/63012-scheduler by this push:
new 2bde652 [fixup] Add developer-oriented README.md
2bde652 is described below
commit 2bde652105d59353050275485cb5865c3a084f73
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Wed Apr 12 12:05:24 2017 -0400
[fixup] Add developer-oriented README.md
---
src/couch_replicator/README.md | 297 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 297 insertions(+)
diff --git a/src/couch_replicator/README.md b/src/couch_replicator/README.md
new file mode 100644
index 0000000..1b0e01b
--- /dev/null
+++ b/src/couch_replicator/README.md
@@ -0,0 +1,297 @@
+Developer Oriented Replicator Description
+=========================================
+
+This description of scheduling replicator's functionality is mainly geared to
+CouchDB developers. It dives a bit into the internal and explains how
+everything is connected together.
+
+A natural place to start is the top applicatin supervisor:
+`couch_replicator_sup`. It's a `rest_for_one` so if a child process
+terminates, the rest of the childred in the hierarchy following it are also
+terminated. This structure implies a useful constraint -- children to the "right"
+if viewing it vertically with the root at the top, can safely call children
+on the "left", because this supervisor ensures those on the "left" will already
+be started and runnig.
+
+A description of each child:
+
+ * `couch_replication_event`: Starts a gen_event publication bus to handle some
+ replication related events. This used for example, to publish cluster
+ membership changes by the `couch_replicator_clustering` process. But is
+ also used in replication tests to minotor for replication events.
+ Notification is performed via the `couch_replicator_notifier:notify/1`
+ function. It's the first (left-most) child because
+ `couch_replicator_clustering` is using.
+
+ * `couch_replicator_clustering`: This module maintains cluster membership
+ information for replication application and provides functions to check
+ ownership of replication jobs. A cluster membership change is published via
+ the `gen_event` event server set up in the `couch_replication_event` child
+ above. Published events are `{cluster, stable}` when cluster membership has
+ stabilized, that it is not fluctuating anymore, and `{cluster, unstable}`
+ which indicates there was a recent change to the cluster membership and now
+ it's considered unstable. Listeners for cluster membership change include
+ `couch_replicator_doc_processor` and `couch_replicator_db_changes`. When
+ doc processor gets an `{cluster, stable}` event it will remove all the
+ replication jobs not belonging to the current node. When
+ `couch_replicator_db_chanages` gets a `{cluster, stable}` event, it will
+ restart `couch_multidb_changes` process it controls which will launch an
+ new scan of all the replicator databases.
+
+ * `couch_replicator_connection`: Maintains a global replication connection
+ pool. It allows reusing connection across replication tasks. Main interface
+ is a `acquire/1` and `release/1`. The main idea here is that once a
+ connection is established, it is kept around for
+ `replicator.connection_close_interval` milliseconds in case another
+ replication task wants to re-use it. It is worth pointing out how linking
+ and monitoring is handled: Workers are linked to the connection pool when
+ they are created. If they crash connection pool listens for the EXIT event
+ and cleans up. Connection pool also monitors owners (by monitoring the the
+ `Pid` from the `From` argument in the call to `acquire/1`) and cleans up if
+ owner dies. Another interesting thing is that connection establishment
+ (creation) happens in the owner process so the pool is not blocked on it.
+
+ * `couch_replicator_rate_limiter` : Implements a rate limiter to handle
+ connection throttling from sources or targets where requests return 429
+ error codes. Uses the Additive Increase / Multiplicative Decrease feedback
+ control algorithm to converge on the channel capacity. Implemented using a
+ 16-way sharded ETS table to maintain connection state. The table sharding
+ code is split out to `couch_replicator_rate_limiter_tables` module. The
+ main idea of the module it so maintain and continually estimate an interval
+ for each connection represented by the `{Method, Url}`. The interval is
+ updated accordingly on each call to `failure/1` or `success/1` calls. A
+ `failure/1` is supposed to be called after a 429 is received and
+ `success/1` when a successful request has been made. Also when no failures
+ are happening the code is ensuring the ETS tables are empty in order to
+ have a lower impact on a running system.
+
+ * `couch_replicator_scheduler` : Scheduler is the core component of the
+ scheduling replicator. It allows handling a larger number of jobs than
+ might be possible to actively run on the cluster. It accomplishes this by
+ switching between jobs (stopping some and starting others) to ensure all
+ make progress. Replication jobs which fail are penalized using exponential
+ backoff. That is, each consecutive failure will double the time penalty.
+ This frees up system resources for more useful work than just continuously
+ trying to run the same subset of failing jobs.
+
+ The main API function is `add_job/1`. Its argument is an instance of
+ `#rep{}` record, which could also be the result of a document update from a
+ _replicator db or it could be the result of a POST to `_replicate`
+ endpoint. Once the replication job is added to the scheduler it doesn't
+ matter much where it originated.
+
+ Each job internally is represented by the `#job{}` record. It contains the
+ original `#rep{}` but also, among a few other things, maintain an event
+ history. The history maintains a sequence of events of each job. These are
+ timestamped and ordered such that the most recent event is at the head.
+ History length is limited based on the `replicator.max_history` config
+ value. The default is 20 entries. History events types are:
+
+ * `added` : job was just added to the scheduler. This is the first event.
+ * `started` : job was started. This was an attempt to run the job.
+ * `stopped` : job was stopped by the scheduler.
+ * `crashed` : job has crashed (instead of stopping cleanly).
+
+ The core of the algorithm is the `reschedule/1` function. That function is
+ called every `replicator.interval` milliseconds (default is 60000 i.e. a
+ minute). During each call scheduler will try to stop some jobs, start some
+ new ones and will also try to keep the maximum amount of jobs running less
+ than `replicator.max_jobs` (deafult is 500). So the functions does these
+ operations (actual code paste):
+
+ ```
+ Running = running_job_count(),
+ Pending = pending_job_count(),
+ stop_excess_jobs(State, Running),
+ start_pending_jobs(State, Running, Pending),
+ rotate_jobs(State, Running, Pending),
+ update_running_jobs_stats(State#state.stats_pid)
+ ```
+
+ `Running` is gathering the total number of currently runnig jobs. `Pending`
+ is the total number of jobs waiting to be run. `stop_excess_jobs` will stop
+ any exceeding `replicator.max_jobs` configured limit. This code takes
+ effect if user reduces `max_jobs` configuration value. `start_pending_jobs`
+ will start any jobs if there is more room available. This will take effect
+ on startup or when user increases `max_jobs` configuration value.
+ `rotate_jobs` is where all the action happens. There scheduler picks
+ `replicator.max_churn` running jobs to stop and then picks the same number
+ of pending jobs to start. The default value of `max_churn` is 20. So by
+ default every minute, 20 running jobs are stopped, and 20 new pending jobs
+ are started.
+
+ Before moving on it is worth pointing out that scheduler treats continuous
+ and non-continuous replications differently. Normal (non-continuous)
+ replications once started will be allowed to run to completion. That
+ behavior is to preserve their semantics of replicating a snapshot of the
+ source database to the target. For example if new documents are added to
+ the source after the replication are started, those updates should not show
+ up on the target database. Stopping and restring a normal replication would
+ violate that constraint. The only exception to the rule is the user
+ explicitly reduces `replicator.max_jobs` configuration value. Even then
+ scheduler will first attempt to stop as many continuous jobs as possible
+ and only if it has no choice left, it will stop normal jobs.
+
+ Keeping that in mind and going back to the scheduling algorithm, the next
+ interesting part is how the scheduler picks which jobs to stop and which
+ ones to start:
+
+ * Stopping: When picking jobs to stop the cheduler will pick longest
+ running continuous jobs first. The sorting callback function to get the
+ longest running jobs is unsurprisingly called `longest_running/2`. To
+ pick the longest running jobs it looks at the most recent `started`
+ event. After it gets a sorted list by longest running, it simply
+ picks first few depending on the value of `max_churn` using
+ `lists:sublist/2`. Then those jobs are stopped.
+
+ * Starting: When starting the scheduler will pick the jobs which have been
+ waiting the longest. Surprisingly, in this case it also looks at the
+ `started` timestamp and picks the jobs which have the oldest `started`
+ timestamp. If there are 3 jobs, A[started=10], B[started=7],
+ C[started=9], then B will be picked first, then C then A. This ensures
+ that jobs are not starved, which is a classic scheduling pitfall.
+
+ In the code, the list of pending jobs is picked slightly differently than
+ how the list of running jobs is picked. `pending_jobs/1` uses `ets:foldl`
+ to iterate over all the pending jobs. As it iterates it tries to keep only
+ up to `max_churn` oldest items in the accumulator. The reason this is done
+ is that there could be a very large number of pending jobs and loading them
+ all in a list (making a copy from ETS) and then sorting it can be quite
+ expensive performance-wise. The tricky part of the iteration is happening
+ in `pending_maybe_replace/2`. A `gb_sets` ordered set is used to keep top-N
+ longest waiting jobs so far. The code has a comment with a helpful example
+ on how this algorithm works.
+
+ The last part is how the scheduler treats jobs which keep crashing. If a
+ job is started but then crashes then that job is considered unhealthy. The
+ main idea is to penalize such jobs such that they are forced to wait an
+ exponentially larger amount of time with each consecutive crash. A central
+ part to this algorithm is determining what forms a sequence of consecutive
+ crashes. If a job starts then quickly crashes, and after next start it
+ crashes again, then that would become a sequence of 2 consecutive crashes.
+ The penalty then would be calcualted by `backoff_micros/1` function where
+ the consecutive crash count would end up as the exponent. However for
+ practical concerns there is also maximum penalty specified and that's the
+ equivalent of 10 consecutive crashes. Timewise it ends up being about 8
+ hours. That means even a job which keep crashing will still get a chance to
+ retry once in 8 hours.
+
+ There is subtlety when calculating consecutive crashes and that is deciding
+ when the sequence stops. That is, figuring out when a job becomes healthy
+ again. Scheduler considers a job healthy again if it started and hasn't
+ crashed in a while. The "in a while" part is a configuration parameter
+ `replicator.health_threshold` defaulting to 2 minutes. This means if job
+ has been crashing, for example 5 times in a row, but then on the 6th
+ attempt it started and ran for more than 2 minutes then it is considered
+ healthy again. Next time it crashes its sequence of consecutive crashes
+ will restart at 1.
+
+ * `couch_replicator_scheduler_sup`: This module is a supervisor for running
+ replication tasks. The most interesting thing about it is perhaps that it is
+ not used to restart children. Scheduler handles restarts and error handling
+ backoffs.
+
+ * `couch_replicator_doc_processor`: Doc procesoor component is in charge of
+ processing replication document updates, turning them into replication jobs
+ and adding those jobs to the scheduler. Unfortunately the only reason there
+ is even a `couch_replicator_doc_processor` gen_server, instead of
+ replication documents being turned to jobs and inserted into the scheduler
+ directly, is because of one corner case - filtered replications using custom
+ (Javascript mostly) filter. More about it later. It is better to start with
+ how updates flow through the doc processor:
+
+ Document updates are coming via the `db_change/3` callback from
+ `couch_multidb_changes`, then go to the `process_change/2` function.
+
+ In `process_change/2` a few decisions are made regarding how to proceed. The
+ first is "ownership" checking. That is a check if replication document
+ belongs on the current node. If not, then it is ignored. Another check is to
+ see if the update has arrived during a time when the cluster is considered
+ "unstable". If so, it is ignored, because soon enough a rescan will be
+ launched and all the documents will be reprocessed anyway. Another
+ noteworthy thing in `process_change/2` is handling of upgrades from the
+ previous version of the replicator when transient states were written to the
+ documents. Two such states were `triggered` and `error`. Both of those
+ states are removed from the document then update proceeds in the regular
+ fashion. `failed` documents are also ignored here. `failed` is a terminal
+ state which indicates the document was somehow unsuitable to become a
+ replication job (it was malforemd or a duplicate). Otherwise the state
+ update proceeds to `process_updated/2`.
+
+ `process_updated/2` is where replication document updates are parsed and
+ translated to `#rep{}` records. The interesting part here is that the
+ replication ID isn't calculated yet. Unsurprisingly the parsing function
+ used is called `parse_rep_doc_without_id/1`. Also note that up until now
+ everything is still running in the context of the `db_change/3`
+ callback. After replication filter type is determined, finally the update
+ gets passed to the `couch_replicator_doc_processor` gen_server.
+
+ `couch_replicator_doc_processor` gen_server's main role is to try to
+ calculate replication IDs for each `#rep{}` record passed to it, then add
+ that as a scheduler job. As noted before, `#rep{}` records parsed up until
+ this point lack a replication ID. The reason is replication ID calculation
+ include a hash of the filter code. And because user defined replication
+ filters live in the source DB, which most likely involves a remote network
+ fetch, there is a possibility of blocking, and a need to handle various
+ network failures and retries. Because of that `replication_doc_processor`
+ dispatchies most of that blocking and retrying to a separate `worker`
+ process (`couch_replicator_doc_processor_worker` module).
+
+ `couch_replicator_doc_processor_worker` is where a replication IDs are
+ calculated for each individual doc update. There are two separate modules
+ which contain utilities related to replication ID calculation:
+ `couch_replicator_ids` and `couch_replicator_filters`. The first one
+ contains ID calculation algorithms and the second one knows how to parse and
+ fetch user filters from remote source DB. One interesting thing about the
+ worker is that it is time-bounded and is guaranteed to not be stuck forever.
+ That's why it spawn an extra process with `spawn_monitor`, just so it can do
+ an `after` clause in receive and bound the maximum time this workerw will
+ take.
+
+ A doc processor worker will either succeed or fail but never block for too
+ long. Success and failure are returned as exit values. Those are handled in
+ the `worker_returned/3` doc processor clauses. The most common pattern is
+ that a worker is spawned to add a replication job, it does so and returns a
+ `{ok, ReplicationID}` value in `worker_returned`.
+
+ In case of a filtered replication with custom user code there are two case to
+ consider:
+
+ 1. Filter fetching code has failed. In that case worker returns an error.
+ But because the error could be a transient network error, another
+ worker is started to try again. It could fail and return error again,
+ then another one is started and so on. However each consecutive worker
+ will do an exponential backoff, not unlike the scheduler code.
+ `error_backoff/1` is where the backoff period is calculated.
+ Consecutive errors are held in the `errcnt` field in the ETS table.
+
+ 2. Fetchig filter code succeeds, replication ID is calculated and job is
+ added to the scheduler. However, because this is a filtered replication
+ source database could get an updated filter. Which means replication ID
+ should change again. So a worker is spawned again even if worker just
+ successfully returned successfully. The purpose is to check the filter
+ and see if it changed. So in other words doc processor will to do the
+ work of checking of filtered replications, get an updated filter and
+ will then refresh the replication job (remove the old one and add a new
+ one with a different ID). Filter checking interval is determined by the
+ `filter_backoff` function. An unusual thing about that function is that
+ it calculates the period based on the size of the ETS table. The
+ intuition is when there are few replications in a cluster, it's ok
+ to check the filter for changes often. When there are lots of
+ replications running, having each one checking their filter often is
+ not a good idea.
+
+ * `couch_replicator`: This is an unusual but useful pattern. This child is not
+ an actual process but a one-time call to the
+ `couch_replicator:ensure_rep_db_exists/0` function, executed by the
+ supervisor in the correct order (and monitored for crashes). This ensures
+ the local replicator db exists, then returns `ignore`. This pattern is
+ useful for doing setup-like things at the top level and in the correct order
+ regaring the rest of the children in the supervisor.
+
+ * `couch_replicator_db_changes`: This process specializes and configure
+ `couch_multidb_changes` so that it looks for `_replicator` suffixed shards
+ and makes sure to restart when cluster configuration changes. This restart
+ on cluster membership changes is often referred to as a "rescan".
+
+
--
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].