You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2019/01/30 14:33:30 UTC

[couchdb-smoosh] branch master created (now cd85523)

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb-smoosh.git.


      at cd85523  Initial import

This branch includes the following new commits:

     new cd85523  Initial import

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb-smoosh] 01/01: Initial import

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb-smoosh.git

commit cd855235b5696d6b41cf124b14dde43405898acf
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Jan 30 14:16:08 2019 +0000

    Initial import
---
 README.md                     | 140 +++++++++++
 operator_guide.md             | 396 ++++++++++++++++++++++++++++++
 src/smoosh.app.src            |  29 +++
 src/smoosh.erl                |  69 ++++++
 src/smoosh_app.erl            |  28 +++
 src/smoosh_channel.erl        | 279 +++++++++++++++++++++
 src/smoosh_priority_queue.erl |  85 +++++++
 src/smoosh_server.erl         | 545 ++++++++++++++++++++++++++++++++++++++++++
 src/smoosh_sup.erl            |  38 +++
 src/smoosh_utils.erl          |  58 +++++
 10 files changed, 1667 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..9f9a480
--- /dev/null
+++ b/README.md
@@ -0,0 +1,140 @@
+Smoosh
+======
+
+Smoosh is CouchDB's auto-compaction daemon. It is notified when
+databases and views are updated and may then elect to enqueue them for
+compaction.
+
+API
+---
+
+All API functions are in smoosh.erl and only the exported functions in
+this module should be called from outside of the smoosh application.
+
+Additionally, smoosh responds to config changes dynamically and these
+changes are the principal means of interacting with smoosh.
+
+Top-Level Settings
+------------------
+
+The main settings one interacts with are:
+
+<dl>
+<dt>db_channels<dd>A comma-separated list of channel names for
+databases.
+<dt>view_channels<dd>A comma-separated list of channel names for
+views.
+<dt>staleness<dd>The number of minutes that the (expensive) priority
+calculation can be stale for before it is recalculated. Defaults to 5.
+</dl>
+
+Sometimes it's necessary to use the following:
+
+<dl>
+<dt>cleanup_index_files</dt><dd>Whether smoosh cleans up the files
+for indexes that have been deleted. Defaults to false and probably
+shouldn't be changed unless the cluster is running low on disk space,
+and only after considering the ramifications.</dd>
+<dt>wait_secs</dt><dd>The time a channel waits before starting compactions 
+to allow time to observe the system and make a smarter decision about what 
+to compact first. Hardly ever changed from the default. Default 30 (seconds).
+</dd>
+</dl>
+
+Channel Settings
+----------------
+
+A channel has several important settings that control runtime
+behavior.
+
+<dl>
+<dt>capacity<dd>The maximum number of items the channel can hold (lowest priority item is removed to make room for new items). Defaults to 9999.
+<dt>concurrency<dd>The maximum number of jobs that can run concurrently. Defaults to 1.
+<dt>max_priority<dd>The item must have a priority lower than this to be enqueued. Defaults to infinity.
+<dt>max_size<dd>The item must be no larger than this many bytes in length to be enqueued. Defaults to infinity.
+<dt>min_priority<dd>The item must have a priority at least this high to be enqueued. Defaults to 5.0 for ratio and 16 mb for slack.
+<dt>min_changes<dd>The minimum number of changes since last compaction before the item will be enqueued. Defaults to 0. Currently only works for databases.
+<dt>min_size<dd>The item must be at least this many bytes in length to be enqueued. Defaults to 1mb (1048576 bytes).
+<dt>priority<dd>The method used to calculate priority. Can be ratio (calculated as disk_size/data_size) or slack (calculated as disk_size-data_size). Defaults to ratio.
+</dl>
+
+Structure
+---------
+
+Smoosh consists of a central gen_server (smoosh_server) which manages
+a number of subordinate smoosh_channel gen_servers. This is not
+properly managed by OTP yet.
+
+Compaction Scheduling Algorithm
+-------------------------------
+
+Smoosh decides whether to compact a database or view by evaluating the
+item against the selection criteria of each _channel_ in the order
+they are configured. By default there are two channels for databases
+("ratio_dbs" and "slack_dbs"), and two channels for views ("ratio_views"
+and "slack_views")
+
+Smoosh will enqueue the new item to the first channel that accepts
+it. If none accept it, the item is not enqueued for compaction.
+
+Notes on the data_size value
+----------------------------
+
+Every database and view shard has a data_size value. In CouchDB this
+accurately reflects the post-compaction file size. In DbCore, it is
+the size of the file that we bill for. It excludes the b+tree and
+database footer overhead. We also bill customers for the uncompressed
+size of their documents, though we store them compressed on disk.
+These two systems were developed independently (ours predates
+CouchDB's) and DbCore only calculates the billing size value.
+
+Because of the way our data_size is currently calculated, it can
+sometimes be necessary to enqueue databases and views with very low
+ratios. Due to this, it is also currently impossible to tell how
+optimally compacted a cluster is.
+
+Example config commands
+-----------------------
+
+Change the set of database channels;
+
+    config:set("smoosh", "db_channels", "small_dbs,medium_dbs,large_dbs").
+
+Change the set of database channels on all live nodes in the cluster;
+
+    rpc:multicall(config, set, ["smoosh", "db_channels", "small_dbs,medium_dbs,large_dbs"]).
+
+Change the concurrency of the ratio_dbs database channel to 2
+
+    config:set("smoosh.ratio_dbs", "concurrency", "2").
+
+Change it on all live nodes in the cluster;
+
+    rpc:multicall(config, set, ["smoosh.ratio_dbs", "concurrency", "2"]).
+
+Example API commands
+--------------------
+
+smoosh:status()
+
+This prints the state of each channel; how many jobs they are
+currently running and how many jobs are enqueued (as well as the
+lowest and highest priority of those enqueued items). The idea is to
+provide, at a glance, sufficient insight into smoosh that an operator
+can assess whether smoosh is adequately targeting the reclaimable
+space in the cluster. In general, a healthy status output will have
+items in the ratio_dbs and ratio_views channels. Owing to the default
+settings, the slack_dbs and slack_views will almost certainly have
+items in them. Historically, we've not found that the slack channels,
+on their own, are particularly adept at keeping things well compacted.
+
+smoosh:enqueue_all_dbs(), smoosh:enqueue_all_views()
+
+These functions do just what they say but should not generally need to
+be called, smoosh is supposed to be autonomous. Call them if you get
+alerted to a disk space issue, they might well help. If they do, that
+indicates a bug in smoosh as it should already have enqueued eligible
+shards once they met the configured settings.
+
+
+
diff --git a/operator_guide.md b/operator_guide.md
new file mode 100644
index 0000000..a0c9810
--- /dev/null
+++ b/operator_guide.md
@@ -0,0 +1,396 @@
+# An operator's guide to smoosh
+
+Smoosh is the auto-compactor for the databases. It automatically selects and
+processes the compacting of database shards on each node.
+
+## Smoosh Channels
+
+Smoosh works using the concept of channels. A channel is essentially a queue of pending
+compactions. There are separate sets of channels for database and view compactions. Each
+channel is assigned a configuration which defines whether a compaction ends up in
+the channel's queue and how compactions are prioritised within that queue.
+
+Smoosh takes each channel and works through the compactions queued in each in priority
+order. Each channel is processed concurrently, so the priority levels only matter within
+a given channel.
+
+Finally, each channel has an assigned number of active compactions, which defines how
+many compactions happen for that channel in parallel. For example, a cluster with
+a lot of database churn but few views might require more active compactions to the
+database channel(s).
+
+It's important to remember that a channel is local to a dbcore node, that is
+each node maintains and processes an independent set of compactions.
+
+### Channel configuration options
+
+#### Channel types
+
+Each channel has a basic type for the algorithm it uses to select pending
+compactions for its queue and how it prioritises them.
+
+The two queue types are:
+
+* **ratio**: this uses the ratio `total_bytes / user_bytes` as its driving
+calculation. The result _X_ must be greater than some configurable value _Y_ for a
+compaction to be added to the queue. Compactions are then prioritised for
+higher values of _X_.
+
+* **slack**: this uses `total_bytes - user_bytes` as its driving calculation.
+The result _X_ must be greater than some configurable value _Y_ for a compaction
+to be added to the queue. Compactions are prioritised for higher values of _X_.
+
+In both cases, _Y_ is set using the `min_priority` configuration variable. The
+calculation of _X_ is described in [Priority calculation](#priority-calculation), below.
+
+Both algorithms operate on two main measures:
+
+* **user_bytes**: this is the amount of data the user has in the file. It
+doesn't include storage overhead: old revisions, on-disk btree structure and
+so on.
+
+* **total_bytes**: the size of the file on disk.
+
+Channel type is set using the `priority` configuration setting.
+
+#### Further configuration options
+
+Beyond its basic type, there are several other configuration options which
+can be applied to a queue.
+
+*All options MUST be set as strings.* See the [smoosh readme][srconfig] for
+all settings and their defaults.
+
+#### Priority calculation
+
+The algorithm type and certain configuration options feed into the priority
+calculation.
+
+The priority is calculated when a compaction is enqueued. As each channel
+has a different configuration, each channel will end up with a different
+priority value. The enqueue code checks each channel in turn to see whether the
+compaction passes its configured priority threshold (`min_priority`). Once
+a channel is found that can accept the compaction, the compaction is added
+to that channel's queue and the enqueue process stops. Therefore the
+ordering of channels has a bearing in what channel a compaction ends up in.
+
+If you want to follow this along, the call order is all in `smoosh_server`,
+`enqueue_request -> find_channel -> get_priority`.
+
+The priority calculation is probably the easiest way to understand the effects
+of configuration variables. It's defined in `smoosh_server#get_priority/3`,
+currently [here][ss].
+
+[ss]: https://github.com/apache/couchdb-smoosh/blob/master/src/smoosh_server.erl#L277
+[srconfig]: https://github.com/apache/couchdb-smoosh#channel-settings
+
+#### Background Detail
+
+`user_bytes` is called `data_size` in `db_info` blocks. It is the total of all bytes
+that are used to store docs and their attachments.
+
+Since `.couch` files are append only, every update adds data to the file. When
+you update a btree, a new leaf node is written and all the nodes back up the
+root. In this update, old data is never overwritten and these parts of the
+file are no longer live; this includes old btree nodes and document bodies.
+Compaction takes this file and writes a new file that only contains live data.
+
+`total_data` is the number of bytes in the file as reported by `ls -al filename`.
+
+#### Flaws
+
+An important flaw in this calculation is that `total_data` takes into account
+the compression of data on disk, whereas `user_bytes` does not. This can give
+unexpected results to calculations, as the values are not directly comparable.
+
+However, it's the best measure we currently have.
+
+[Even more info](https://github.com/apache/couchdb-smoosh#notes-on-the-data_size-value).
+
+
+### Defining a channel
+
+Defining a channel is done via normal dbcore configuration, with some
+convention as to the parameter names.
+
+Channel configuration is defined using `smoosh.channel_name` top level config
+options. Defining a channel is just setting the various options you want
+for the channel, then bringing it into smoosh's sets of active channels by
+adding it to either `db_channels` or `view_channels`.
+
+This means that smoosh channels can be defined either for a single node or
+globally across a cluster, by setting the configuration either globally or
+locally. In the example, we set up a new global channel.
+
+It's important to choose good channel names. There are some conventional ones:
+
+* `ratio_dbs`: a ratio channel for dbs, usually using the default settings.
+* `slack_dbs`: a slack channel for dbs, usually using the default settings.
+* `ratio_views`: a ratio channel for views, usually using the default settings.
+* `slack_views`: a slack channel for views, usually using the default settings.
+
+These four are defined by default if there are no others set ([source][source1]).
+
+[source1]: https://github.com/apache/couchdb-smoosh/blob/master/src/smoosh_server.erl#L75
+
+And some standard names for ones we often have to add:
+
+* `big_dbs`: a ratio channel for only enqueuing large database shards. What
+  _large_ means is very workload specific.
+
+Channels have certain defaults for their configuration, defined in the
+[smoosh readme][srconfig]. It's only neccessary to set up how this channel
+differs from those defaults. Below, we just need to set the `min_size` and
+`concurrency` settings, and allow the `priority` to default to `ratio`
+along with the other defaults.
+
+```bash
+# Define the new channel
+(couchdb@db1.foo.bar)3> s:set_config("smoosh.big_dbs", "min_size", "20000000000", global).
+{[ok,ok,ok],[]}
+(couchdb@db1.foo.bar)3> s:set_config("smoosh.big_dbs", "concurrency", "2", global).
+{[ok,ok,ok],[]}
+
+# Add the channel to the db_channels set -- note we need to get the original
+# value first so we can add the new one to the existing list!
+(couchdb@db1.foo.bar)5> s:get_config("smoosh", "db_channels", global).
+{[{'couchdb@db1.foo.bar',"ratio_dbs"},
+{'couchdb@db3.foo.bar',"ratio_dbs"},
+{'couchdb@db2.foo.bar',"ratio_dbs"}],
+[]}
+(couchdb@db1.foo.bar)6> s:set_config("smoosh", "db_channels", "ratio_dbs,big_dbs", global).
+{[ok,ok,ok],[]}
+```
+
+### Viewing active channels
+
+```bash
+(couchdb@db3.foo.bar)3> s:get_config("smoosh", "db_channels", global).
+{[{'couchdb@db3.foo.bar',"ratio_dbs,big_dbs"},
+  {'couchdb@db1.foo.bar',"ratio_dbs,big_dbs"},
+  {'couchdb@db2.foo.bar',"ratio_dbs,big_dbs"}],
+ []}
+(couchdb@db3.foo.bar)4> s:get_config("smoosh", "view_channels", global).
+{[{'couchdb@db3.foo.bar',"ratio_views"},
+  {'couchdb@db1.foo.bar',"ratio_views"},
+  {'couchdb@db2.foo.bar',"ratio_views"}],
+ []}
+```
+
+### Removing a channel
+
+```bash
+# Remove it from the active set
+(couchdb@db1.foo.bar)5> s:get_config("smoosh", "db_channels", global).
+{[{'couchdb@db1.foo.bar',"ratio_dbs,big_dbs"},
+{'couchdb@db3.foo.bar',"ratio_dbs,big_dbs"},
+{'couchdb@db2.foo.bar',"ratio_dbs,big_dbs"}],
+[]}
+(couchdb@db1.foo.bar)6> s:set_config("smoosh", "db_channels", "ratio_dbs", global).
+{[ok,ok,ok],[]}
+
+# Delete the config -- you need to do each value
+(couchdb@db1.foo.bar)3> rpc:multicall(config, delete, ["smoosh.big_dbs", "concurrency"]).
+{[ok,ok,ok],[]}
+(couchdb@db1.foo.bar)3> rpc:multicall(config, delete, ["smoosh.big_dbs", "min_size"]).
+{[ok,ok,ok],[]}
+```
+
+### Getting channel configuration
+
+As far as I know, you have to get each setting separately:
+
+```
+(couchdb@db1.foo.bar)1> s:get_config("smoosh.big_dbs", "concurrency", global).
+{[{'couchdb@db3.foo.bar',"2"},
+  {'couchdb@db1.foo.bar',"2"},
+  {'couchdb@db2.foo.bar',"2"}],
+ []}
+
+```
+
+### Setting channel configuration
+
+The same as defining a channel, you just need to set the new value:
+
+```
+(couchdb@db1.foo.bar)2> s:set_config("smoosh.ratio_dbs", "concurrency", "1", global).
+{[ok,ok,ok],[]}
+```
+
+It sometimes takes a little while to take affect.
+
+
+
+## Standard operating procedures
+
+There are a few standard things that operators often have to do when responding
+to pages.
+
+In addition to the below, in some circumstances it's useful to define new
+channels with certain properties (`big_dbs` is a common one) if smoosh isn't
+selecting and prioritising compactions that well.
+
+### Checking smoosh's status
+
+You can see the queued items for each channel by going into `remsh` on a node
+and using:
+
+```
+> smoosh:status().
+{ok,[{"ratio_dbs",
+      [{active,1},
+       {starting,0},
+       {waiting,[{size,522},
+                 {min,{5.001569007970237,{1378,394651,323864}}},
+                 {max,{981756.5441159063,{1380,370286,655752}}}]}]},
+     {"slack_views",
+      [{active,1},
+       {starting,0},
+       {waiting,[{size,819},
+                 {min,{16839814,{1375,978920,326458}}},
+                 {max,{1541336279,{1380,370205,709896}}}]}]},
+     {"slack_dbs",
+      [{active,1},
+       {starting,0},
+       {waiting,[{size,286},
+                 {min,{19004944,{1380,295245,887295}}},
+                 {max,{48770817098,{1380,370185,876596}}}]}]},
+     {"ratio_views",
+      [{active,1},
+       {starting,0},
+       {waiting,[{size,639},
+                 {min,{5.0126340031149335,{1380,186581,445489}}},
+                 {max,{10275.555632057285,{1380,370411,421477}}}]}]}]}
+```
+
+This gives you the node-local status for each queue.
+
+Under each channel there is some information about the channel:
+
+* `active`: number of current compactions in the channel.
+* `starting`: number of compactions starting-up.
+* `waiting`: number of queued compactions.
+  * `min` and `max` give an idea of the queued jobs' effectiveness. The values
+    for these are obviously dependent on whether the queue is ratio or slack.
+
+For ratio queues, the default minimum for smoosh to enqueue a compaction is 5. In
+the example above, we can guess that 981,756 is quite high. This could be a
+small database, however, so it doesn't necessarily mean useful compactions
+from the point of view of reclaiming disk space.
+
+For this example, we can see that there are quite a lot of queued compactions,
+but we don't know which would be most effective to run to reclaim disk space.
+It's also worth noting that the waiting queue sizes are only meaningful
+related to other factors on the cluster (e.g., db number and size).
+
+
+### Smoosh IOQ priority
+
+This is a global setting which affects all channels. Increasing it allows each
+active compaction to (hopefully) proceed faster as the compaction work is of
+a higher priority relative to other jobs. Decreasing it (hopefully) has the
+converse effect.
+
+By this point you'll [know whether smoosh is backing up](#checking-smooshs-status).
+If it's falling behind (big queues), try increasing compaction priority.
+
+Smoosh's IOQ priority is controlled via the `ioq` -> `compaction` queue.
+
+```
+> s:get_config("ioq", "compaction", global).
+{[{'couchdb@db1.foo.bar',undefined},
+  {'couchdb@db2.foo.bar',undefined},
+  {'couchdb@db3.foo.bar',undefined}],
+ []}
+
+```
+
+Priority by convention runs 0 to 1, though the priority can be any positive
+number. The default for compaction is 0.01; pretty low.
+
+If it looks like smoosh has a bunch of work that it's not getting
+through, priority can be increased. However, be careful that this
+doesn't adversely impact the customer experience. If it will, and
+it's urgent, at least drop them a warning.
+
+```
+> s:set_config("ioq", "compaction", "0.5", global).
+{[ok,ok,ok],[]}
+```
+
+In general, this should be a temporary measure. For some clusters,
+a change from the default may be required to help smoosh keep up
+with particular workloads.
+
+### Granting specific channels more workers
+
+Giving smoosh a higher concurrency for a given channel can allow a backlog
+in that channel to catch up.
+
+Again, some clusters run best with specific channels having more workers.
+
+From [assessing disk space](#assess-the-space-on-the-disk), you should
+know whether the biggest offenders are db or view files. From this,
+you can infer whether it's worth giving a specific smoosh channel a
+higher concurrency.
+
+The current setting can be seen for a channel like so:
+
+```
+> s:get_config("smoosh.ratio_dbs", "concurrency", global).
+{[{'couchdb@db1.foo.bar',undefined},
+  {'couchdb@db2.foo.bar',undefined},
+  {'couchdb@db3.foo.bar',undefined}],
+ []}
+```
+
+`undefined` means the default is used.
+
+If we knew that disk space for DBs was the major user of disk space, we might
+want to increase a `_dbs` channel. Experience shows `ratio_dbs` is often best
+but evaluate this based on the current status.
+
+If we want to increase the ratio_dbs setting:
+
+```
+> s:set_config("smoosh.ratio_dbs", "concurrency", "2", global).
+{[ok,ok,ok],[]}
+```
+
+### Suspending smoosh
+
+If smoosh itself is causing issues, it's possible to suspend its operation.
+This differs from either `application:stop(smoosh).` or setting all channel's
+concurrency to zero because it both pauses on going compactions and maintains
+the channel queues intact.
+
+If, for example, a node's compactions are causing disk space issues, smoosh
+could be suspended while working out which channel is causing the problem. For
+example, a big_dbs channel might be creating huge compaction-in-progress
+files if there's not much in the shard to compact away.
+
+It's therefore useful to use when testing to see if smoosh is causing a
+problem.
+
+```
+# suspend
+smoosh:suspend().
+
+# resume a suspended smoosh
+smoosh:resume().
+```
+
+Suspend is currently pretty literal: `erlang:suspend_process(Pid, [unless_suspending])`
+is called for each compaction process in each channel. `resume_process` is called
+for resume.
+
+### Restarting Smoosh
+
+Restarting Smoosh is a long shot and is a brute force approach in the hope that
+when Smoosh rescans the DBs that it makes the right decisions. If required to take
+this step contact rnewson or davisp so that they can inspect Smoosh and see the bug.
+
+```
+> exit(whereis(smoosh_server), kill), smoosh:enqueue_all_dbs(), smoosh:enqueue_all_views().
+```
diff --git a/src/smoosh.app.src b/src/smoosh.app.src
new file mode 100644
index 0000000..a6cdb7f
--- /dev/null
+++ b/src/smoosh.app.src
@@ -0,0 +1,29 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, smoosh,
+ [
+  {description, "Auto-compaction daemon"},
+  {vsn, git},
+  {registered, [smoosh_server]},
+  {applications, [
+                  kernel,
+                  stdlib,
+                  couch_log,
+                  config,
+                  couch_event,
+                  couch,
+                  mem3
+                 ]},
+  {mod, { smoosh_app, []}},
+  {env, []}
+ ]}.
diff --git a/src/smoosh.erl b/src/smoosh.erl
new file mode 100644
index 0000000..676e7fa
--- /dev/null
+++ b/src/smoosh.erl
@@ -0,0 +1,69 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-export([suspend/0, resume/0, enqueue/1, status/0]).
+-export([enqueue_all_dbs/0, enqueue_all_dbs/1, enqueue_all_views/0]).
+
+suspend() ->
+    smoosh_server:suspend().
+
+resume() ->
+    smoosh_server:resume().
+
+enqueue(Object) ->
+    smoosh_server:enqueue(Object).
+
+sync_enqueue(Object) ->
+    smoosh_server:sync_enqueue(Object).
+
+sync_enqueue(Object, Timeout) ->
+    smoosh_server:sync_enqueue(Object, Timeout).
+
+status() ->
+    smoosh_server:status().
+
+enqueue_all_dbs() ->
+    fold_local_shards(fun(#shard{name=Name}, _Acc) ->
+        sync_enqueue(Name) end, ok).
+
+enqueue_all_dbs(Timeout) ->
+    fold_local_shards(fun(#shard{name=Name}, _Acc) ->
+        sync_enqueue(Name, Timeout) end, ok).
+
+enqueue_all_views() ->
+    fold_local_shards(fun(#shard{name=Name}, _Acc) ->
+        catch enqueue_views(Name) end, ok).
+
+fold_local_shards(Fun, Acc0) ->
+    mem3:fold_shards(fun(Shard, Acc1) ->
+        case node() == Shard#shard.node of
+            true ->
+                Fun(Shard, Acc1);
+            false ->
+                Acc1
+        end
+    end, Acc0).
+
+enqueue_views(ShardName) ->
+    DbName = mem3:dbname(ShardName),
+    {ok, DDocs} = fabric:design_docs(DbName),
+    [sync_enqueue({ShardName, id(DDoc)}) || DDoc <- DDocs].
+
+id(#doc{id=Id}) ->
+    Id;
+id({Props}) ->
+    couch_util:get_value(<<"_id">>, Props).
diff --git a/src/smoosh_app.erl b/src/smoosh_app.erl
new file mode 100644
index 0000000..eba3579
--- /dev/null
+++ b/src/smoosh_app.erl
@@ -0,0 +1,28 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+    smoosh_sup:start_link().
+
+stop(_State) ->
+    ok.
diff --git a/src/smoosh_channel.erl b/src/smoosh_channel.erl
new file mode 100644
index 0000000..58d3ce7
--- /dev/null
+++ b/src/smoosh_channel.erl
@@ -0,0 +1,279 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_channel).
+-behaviour(gen_server).
+-vsn(1).
+-include_lib("couch/include/couch_db.hrl").
+
+% public api.
+-export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
+-export([enqueue/3, last_updated/2, flush/1]).
+
+% gen_server api.
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+    code_change/3, terminate/2]).
+
+% records.
+
+-record(state, {
+    active=[],
+    name,
+    waiting=smoosh_priority_queue:new(),
+    paused=true,
+    starting=[]
+}).
+
+% public functions.
+
+start_link(Name) ->
+    gen_server:start_link(?MODULE, Name, []).
+
+suspend(ServerRef) ->
+    gen_server:call(ServerRef, suspend).
+
+resume(ServerRef) ->
+    gen_server:call(ServerRef, resume).
+
+enqueue(ServerRef, Object, Priority) ->
+    gen_server:cast(ServerRef, {enqueue, Object, Priority}).
+
+last_updated(ServerRef, Object) ->
+    gen_server:call(ServerRef, {last_updated, Object}).
+
+get_status(ServerRef) ->
+    gen_server:call(ServerRef, status).
+
+close(ServerRef) ->
+    gen_server:call(ServerRef, close).
+
+flush(ServerRef) ->
+    gen_server:call(ServerRef, flush).
+
+% gen_server functions.
+
+init(Name) ->
+    schedule_unpause(),
+    {ok, #state{name=Name}}.
+
+handle_call({last_updated, Object}, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting),
+    {reply, LastUpdated, State};
+
+handle_call(suspend, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    #state{active = Active} = State,
+    [catch erlang:suspend_process(Pid, [unless_suspending])
+        || {_,Pid} <- Active],
+    {reply, ok, State#state{paused=true}};
+
+handle_call(resume, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    #state{active = Active} = State,
+    [catch erlang:resume_process(Pid) || {_,Pid} <- Active],
+    {reply, ok, State#state{paused=false}};
+
+handle_call(status, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {reply, {ok, [
+        {active, length(State#state.active)},
+        {starting, length(State#state.starting)},
+        {waiting, smoosh_priority_queue:info(State#state.waiting)}
+    ]}, State};
+
+handle_call(close, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {stop, normal, ok, State};
+
+handle_call(flush, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {reply, ok, State#state{waiting=smoosh_priority_queue:new()}}.
+
+handle_cast({enqueue, _Object, 0}, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {noreply, State};
+handle_cast({enqueue, Object, Priority}, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}.
+
+% We accept noproc here due to possibly having monitored a restarted compaction
+% pid after it finished.
+handle_info({'DOWN', Ref, _, Job, Reason}, State0)  when Reason == normal;
+        Reason == noproc ->
+    {ok, State} = code_change(nil, State0, nil),
+    #state{active=Active, starting=Starting} = State,
+    {noreply, maybe_start_compaction(
+                State#state{active=lists:keydelete(Job, 2, Active),
+                            starting=lists:keydelete(Ref, 1, Starting)})};
+
+handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    #state{active=Active0, starting=Starting0} = State,
+    case lists:keytake(Job, 2, Active0) of
+        {value, {Key, _Pid}, Active1} ->
+            couch_log:warning("exit for compaction of ~p: ~p", [
+                smoosh_utils:stringify(Key), Reason]),
+            {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
+            {noreply, maybe_start_compaction(State#state{active=Active1})};
+        false ->
+            case lists:keytake(Ref, 1, Starting0) of
+                {value, {_, Key}, Starting1} ->
+                    couch_log:warning("failed to start compaction of ~p: ~p", [
+                        smoosh_utils:stringify(Key), Reason]),
+                    {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
+                    {noreply, maybe_start_compaction(State#state{starting=Starting1})};
+                false ->
+                    {noreply, State}
+            end
+    end;
+
+handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
+    {ok, State} = code_change(nil, State0, nil),
+    case lists:keytake(Ref, 1, State#state.starting) of
+        {value, {_, Key}, Starting1} ->
+            couch_log:notice("~s: Started compaction for ~s",
+                     [State#state.name, smoosh_utils:stringify(Key)]),
+            erlang:monitor(process, Pid),
+            erlang:demonitor(Ref, [flush]),
+            {noreply, State#state{active=[{Key, Pid}|State#state.active],
+                                  starting=Starting1}};
+        false ->
+            {noreply, State}
+    end;
+
+handle_info(pause, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {noreply, State#state{paused=true}};
+handle_info(unpause, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {noreply, maybe_start_compaction(State#state{paused=false})}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, #state{}=State, _Extra) ->
+    {ok, State}.
+
+% private functions.
+
+add_to_queue(Key, Priority, State) ->
+    #state{active=Active,waiting=Q} = State,
+    case lists:keymember(Key, 1, Active) of
+    true ->
+        State;
+    false ->
+        Capacity = list_to_integer(smoosh_utils:get(State#state.name, "capacity", "9999")),
+        couch_log:notice(
+            "~s: adding ~p to internal compactor queue with priority ~p",
+                 [State#state.name, Key, Priority]),
+        State#state{
+            waiting=smoosh_priority_queue:in(Key, Priority, Priority, Capacity, Q)
+        }
+    end.
+
+maybe_start_compaction(#state{paused=true}=State) ->
+    State;
+maybe_start_compaction(State) ->
+    Concurrency = list_to_integer(smoosh_utils:get(State#state.name,
+        "concurrency", "1")),
+    if length(State#state.active) + length(State#state.starting) < Concurrency ->
+        case smoosh_priority_queue:out(State#state.waiting) of
+        false ->
+            State;
+        {Key, Priority, Q} ->
+            try
+                State2 = case start_compact(State, Key) of
+                false ->
+                    State;
+                State1 ->
+                    couch_log:notice(
+                        "~s: Starting compaction for ~s (priority ~p)",
+                        [State#state.name, smoosh_utils:stringify(Key), Priority]),
+                    State1
+                end,
+                maybe_start_compaction(State2#state{waiting=Q})
+            catch Class:Exception ->
+                couch_log:notice("~s: ~p ~p for ~s",
+                    [State#state.name, Class, Exception,
+                        smoosh_utils:stringify(Key)]),
+                maybe_start_compaction(State#state{waiting=Q})
+            end
+        end;
+    true ->
+        State
+    end.
+
+start_compact(State, {schema, DbName, GroupId}) ->
+    case smoosh_utils:ignore_db({DbName, GroupId}) of
+        false ->
+            {ok, Pid} = couch_md_index_manager:get_group_pid(DbName,
+                GroupId),
+            Ref = erlang:monitor(process, Pid),
+            Pid ! {'$gen_call', {self(), Ref}, compact},
+            State#state{starting=[{Ref, {schema, DbName,
+                GroupId}} | State#state.starting]};
+        _ ->
+            false
+    end;
+
+start_compact(State, DbName) when is_list(DbName) ->
+    start_compact(State, ?l2b(DbName));
+start_compact(State, DbName) when is_binary(DbName) ->
+    {ok, Db} = couch_db:open_int(DbName, []),
+    try start_compact(State, Db) after couch_db:close(Db) end;
+start_compact(State, {Shard,GroupId}) ->
+    case smoosh_utils:ignore_db({Shard, GroupId}) of
+    false ->
+        DbName = mem3:dbname(Shard),
+        {ok, Pid} = couch_index_server:get_index(
+                couch_mrview_index, Shard, GroupId),
+        spawn(fun() -> cleanup_index_files(DbName, Shard) end),
+        Ref = erlang:monitor(process, Pid),
+        Pid ! {'$gen_call', {self(), Ref}, compact},
+        State#state{starting=[{Ref, {Shard, GroupId}}|State#state.starting]};
+    _ ->
+        false
+    end;
+start_compact(State, Db) ->
+    case smoosh_utils:ignore_db(Db) of
+    false ->
+        DbPid = couch_db:get_pid(Db),
+        Key = couch_db:name(Db),
+        case couch_db:get_compactor_pid(Db) of
+            nil ->
+                Ref = erlang:monitor(process, DbPid),
+                DbPid ! {'$gen_call', {self(), Ref}, start_compact},
+                State#state{starting=[{Ref, Key}|State#state.starting]};
+            % database is still compacting so we can just monitor the existing
+            % compaction pid
+            CPid ->
+                couch_log:notice("Db ~s continuing compaction",
+                    [smoosh_utils:stringify(Key)]),
+                erlang:monitor(process, CPid),
+                State#state{active=[{Key, CPid}|State#state.active]}
+        end;
+    _ ->
+        false
+    end.
+
+schedule_unpause() ->
+    WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")),
+    erlang:send_after(WaitSecs * 1000, self(), unpause).
+
+cleanup_index_files(DbName, _Shard) ->
+    case config:get("smoosh", "cleanup_index_files", "false") of
+    "true" ->
+        fabric:cleanup_index_files(DbName);
+    _ ->
+        ok
+    end.
diff --git a/src/smoosh_priority_queue.erl b/src/smoosh_priority_queue.erl
new file mode 100644
index 0000000..b7ede55
--- /dev/null
+++ b/src/smoosh_priority_queue.erl
@@ -0,0 +1,85 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_priority_queue).
+
+-export([new/0, last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
+
+-record(priority_queue, {
+    dict=dict:new(),
+    tree=gb_trees:empty()
+}).
+
+new() ->
+    #priority_queue{}.
+
+last_updated(Key, #priority_queue{dict=Dict}) ->
+    case dict:find(Key, Dict) of
+        {ok, {_, LastUpdated}} ->
+            LastUpdated;
+        error ->
+            false
+    end.
+
+is_key(Key, #priority_queue{dict=Dict}) ->
+    dict:is_key(Key, Dict).
+
+in(Key, Value, Priority, Q) ->
+    in(Key, Value, Priority, infinity, Q).
+
+in(Key, Value, Priority, Capacity, #priority_queue{dict=Dict, tree=Tree}) ->
+    Tree1 = case dict:find(Key, Dict) of
+        {ok, TreeKey} ->
+            gb_trees:delete_any(TreeKey, Tree);
+        error ->
+            Tree
+    end,
+    TreeKey1 = {Priority, now()},
+    Tree2 = gb_trees:enter(TreeKey1, {Key, Value}, Tree1),
+    Dict1 = dict:store(Key, TreeKey1, Dict),
+    truncate(Capacity, #priority_queue{dict=Dict1, tree=Tree2}).
+
+out(#priority_queue{dict=Dict,tree=Tree}) ->
+    case gb_trees:is_empty(Tree) of
+    true ->
+        false;
+    false ->
+        {_, {Key, Value}, Tree1} = gb_trees:take_largest(Tree),
+        Dict1 = dict:erase(Key, Dict),
+        {Key, Value, #priority_queue{dict=Dict1, tree=Tree1}}
+    end.
+
+size(#priority_queue{tree=Tree}) ->
+    gb_trees:size(Tree).
+
+info(#priority_queue{tree=Tree}=Q) ->
+    [{size, ?MODULE:size(Q)}|
+     case gb_trees:is_empty(Tree) of
+         true ->
+             [];
+         false ->
+             {Min, _, _} = gb_trees:take_smallest(Tree),
+             {Max, _, _} = gb_trees:take_largest(Tree),
+             [{min, Min}, {max, Max}]
+     end].
+
+truncate(infinity, Q) ->
+    Q;
+truncate(Capacity, Q) when Capacity > 0 ->
+    truncate(Capacity, ?MODULE:size(Q), Q).
+
+truncate(Capacity, Size, Q) when Size =< Capacity ->
+    Q;
+truncate(Capacity, Size, #priority_queue{dict=Dict, tree=Tree}) when Size > 0 ->
+    {_, {Key, _}, Tree1} = gb_trees:take_smallest(Tree),
+    Q1 = #priority_queue{dict=dict:erase(Key, Dict), tree=Tree1},
+    truncate(Capacity, ?MODULE:size(Q1), Q1).
diff --git a/src/smoosh_server.erl b/src/smoosh_server.erl
new file mode 100644
index 0000000..d0e064b
--- /dev/null
+++ b/src/smoosh_server.erl
@@ -0,0 +1,545 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_server).
+-behaviour(gen_server).
+-vsn(3).
+-behaviour(config_listener).
+-include_lib("couch/include/couch_db.hrl").
+
+% public api.
+-export([
+    start_link/0,
+    suspend/0,
+    resume/0,
+    enqueue/1,
+    sync_enqueue/1,
+    sync_enqueue/2,
+    handle_db_event/3,
+    status/0
+]).
+
+% gen_server api.
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+    code_change/3, terminate/2]).
+
+% config_listener api
+-export([handle_config_change/5, handle_config_terminate/3]).
+
+% exported but for internal use.
+-export([enqueue_request/2]).
+
+% private records.
+
+-record(state, {
+    db_channels=[],
+    view_channels=[],
+    schema_channels=[],
+    tab,
+    event_listener,
+    waiting=dict:new()
+}).
+
+-record(channel, {
+    name,
+    pid
+}).
+
+% public functions.
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+suspend() ->
+    gen_server:call(?MODULE, suspend).
+
+resume() ->
+    gen_server:call(?MODULE, resume).
+
+status() ->
+    gen_server:call(?MODULE, status).
+
+enqueue(Object) ->
+    gen_server:cast(?MODULE, {enqueue, Object}).
+
+sync_enqueue(Object) ->
+    gen_server:call(?MODULE, {enqueue, Object}).
+
+sync_enqueue(Object, Timeout) ->
+    gen_server:call(?MODULE, {enqueue, Object}, Timeout).
+
+handle_db_event(DbName, local_updated, St) ->
+    smoosh_server:enqueue(DbName),
+    {ok, St};
+handle_db_event(DbName, updated, St) ->
+    smoosh_server:enqueue(DbName),
+    {ok, St};
+handle_db_event(DbName, {index_commit, IdxName}, St) ->
+    smoosh_server:enqueue({DbName, IdxName}),
+    {ok, St};
+handle_db_event(DbName, {schema_updated, DDocId}, St) ->
+    smoosh_server:enqueue({schema, DbName, DDocId}),
+    {ok, St};
+handle_db_event(_DbName, _Event, St) ->
+    {ok, St}.
+
+% gen_server functions.
+
+init([]) ->
+    process_flag(trap_exit, true),
+    ok = config:listen_for_changes(?MODULE, nil),
+    {ok, Pid} = start_event_listener(),
+    DbChannels = smoosh_utils:split(
+                   config:get("smoosh", "db_channels", "upgrade_dbs,ratio_dbs,slack_dbs")),
+    ViewChannels = smoosh_utils:split(
+                     config:get("smoosh", "view_channels", "upgrade_views,ratio_views,slack_views")),
+    SchemaChannels = smoosh_utils:split(config:get("smoosh",
+        "schema_channels", "ratio_schemas,slack_schemas")),
+    Tab = ets:new(channels, [{keypos, #channel.name}]),
+    {ok, create_missing_channels(#state{
+        db_channels=DbChannels,
+        view_channels=ViewChannels,
+        schema_channels=SchemaChannels,
+        event_listener=Pid,
+        tab=Tab
+    })}.
+
+handle_config_change("smoosh", "db_channels", L, _, _) ->
+    {ok, gen_server:cast(?MODULE, {new_db_channels, smoosh_utils:split(L)})};
+handle_config_change("smoosh", "view_channels", L, _, _) ->
+    {ok, gen_server:cast(?MODULE, {new_view_channels, smoosh_utils:split(L)})};
+handle_config_change("smoosh", "schema_channels", L, _, _) ->
+    {ok, gen_server:cast(?MODULE, {new_schema_channels, smoosh_utils:split(L)})};
+handle_config_change(_, _, _, _, _) ->
+    {ok, nil}.
+
+handle_config_terminate(_, stop, _) -> ok;
+handle_config_terminate(_, _, _) ->
+    spawn(fun() ->
+        timer:sleep(5000),
+        config:listen_for_changes(?MODULE, nil)
+    end).
+
+handle_call(status, _From, State) ->
+    Acc = ets:foldl(fun get_channel_status/2, [], State#state.tab),
+    {reply, {ok, Acc}, State};
+
+handle_call({enqueue, Object}, _From, State) ->
+    {noreply, NewState} = handle_cast({enqueue, Object}, State),
+    {reply, ok, NewState};
+
+handle_call(suspend, _From, State) ->
+    ets:foldl(fun(#channel{name=Name, pid=P}, _) ->
+        couch_log:notice("Suspending ~p", [Name]),
+        smoosh_channel:suspend(P) end, 0,
+        State#state.tab),
+    {reply, ok, State};
+
+handle_call(resume, _From, State) ->
+    ets:foldl(fun(#channel{name=Name, pid=P}, _) ->
+        couch_log:notice("Resuming ~p", [Name]),
+        smoosh_channel:resume(P) end, 0,
+        State#state.tab),
+    {reply, ok, State}.
+
+handle_cast({new_db_channels, Channels}, State) ->
+    [smoosh_channel:close(channel_pid(State#state.tab, C)) ||
+        C <- State#state.db_channels -- Channels],
+    {noreply, create_missing_channels(State#state{db_channels=Channels})};
+
+handle_cast({new_view_channels, Channels}, State) ->
+    [smoosh_channel:close(channel_pid(State#state.tab, C)) ||
+        C <- State#state.view_channels -- Channels],
+    {noreply, create_missing_channels(State#state{view_channels=Channels})};
+
+handle_cast({new_schema_channels, Channels}, State) ->
+    [smoosh_channel:close(channel_pid(State#state.tab, C)) ||
+        C <- State#state.schema_channels -- Channels],
+    {noreply, create_missing_channels(State#state{view_channels=Channels})};
+
+handle_cast({enqueue, Object}, State) ->
+    #state{waiting=Waiting}=State,
+    case dict:is_key(Object, Waiting) of
+        true ->
+            {noreply, State};
+        false ->
+            {_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]),
+            {noreply, State#state{waiting=dict:store(Object, Ref, Waiting)}}
+    end.
+
+handle_info({'EXIT', Pid, Reason}, #state{event_listener=Pid}=State) ->
+        couch_log:notice("update notifier died ~p", [Reason]),
+        {ok, Pid1} = start_event_listener(),
+        {noreply, State#state{event_listener=Pid1}};
+handle_info({'EXIT', Pid, Reason}, State) ->
+    couch_log:notice("~p ~p died ~p", [?MODULE, Pid, Reason]),
+    case ets:match_object(State#state.tab, #channel{pid=Pid, _='_'}) of
+    [#channel{name=Name}] ->
+        ets:delete(State#state.tab, Name);
+    _ ->
+        ok
+    end,
+    {noreply, create_missing_channels(State)};
+
+handle_info({'DOWN', Ref, _, _, _}, State) ->
+    Waiting = dict:filter(fun(_Key, Value) -> Value =/= Ref end,
+                          State#state.waiting),
+    {noreply, State#state{waiting=Waiting}};
+
+handle_info(_Msg, State) ->
+    {noreply, State}.
+
+terminate(_Reason, State) ->
+    ets:foldl(fun(#channel{pid=P}, _) -> smoosh_channel:close(P) end, 0,
+        State#state.tab),
+    ok.
+
+code_change(_OldVsn, {state, DbChannels, ViewChannels, Tab,
+        EventListener, Waiting}, _Extra) ->
+    {ok, #state{db_channels=DbChannels, view_channels=ViewChannels,
+        schema_channels=[], tab=Tab, event_listener = EventListener,
+            waiting=Waiting}};
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+% private functions.
+
+get_channel_status(#channel{name=Name, pid=P}, Acc0) when is_pid(P) ->
+    try gen_server:call(P, status) of
+    {ok, Status} ->
+        [{Name, Status} | Acc0];
+    _ ->
+        Acc0
+    catch _:_ ->
+        Acc0
+    end;
+get_channel_status(_, Acc0) ->
+    Acc0.
+
+start_event_listener() ->
+    couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]).
+
+enqueue_request(State, Object) ->
+    try
+        case find_channel(State, Object) of
+        false ->
+            ok;
+        {ok, Pid, Priority} ->
+            smoosh_channel:enqueue(Pid, Object, Priority)
+        end
+    catch Class:Exception ->
+        Stack = erlang:get_stacktrace(),
+        couch_log:notice("~s: ~p ~p for ~s : ~p",
+            [?MODULE, Class, Exception,
+                smoosh_utils:stringify(Object), Stack])
+    end.
+
+find_channel(#state{}=State, {schema, DbName, GroupId}) ->
+    find_channel(State#state.tab, State#state.schema_channels, {schema, DbName, GroupId});
+find_channel(#state{}=State, {Shard, GroupId}) ->
+    find_channel(State#state.tab, State#state.view_channels, {Shard, GroupId});
+find_channel(#state{}=State, DbName) ->
+    find_channel(State#state.tab, State#state.db_channels, DbName).
+
+find_channel(_Tab, [], _Object) ->
+    false;
+find_channel(Tab, [Channel|Rest], Object) ->
+    Pid = channel_pid(Tab, Channel),
+    LastUpdated = smoosh_channel:last_updated(Pid, Object),
+    Staleness = 6.0e7 * list_to_integer(config:get("smoosh", "staleness", "5")),
+    case LastUpdated =:= false orelse
+        timer:now_diff(now(), LastUpdated) > Staleness of
+    true ->
+        case smoosh_utils:ignore_db(Object) of
+        true ->
+            find_channel(Tab, Rest, Object);
+        _ ->
+            case get_priority(Channel, Object) of
+            0 ->
+                find_channel(Tab, Rest, Object);
+            Priority ->
+                {ok, Pid, Priority}
+            end
+        end;
+    false ->
+        find_channel(Tab, Rest, Object)
+    end.
+
+channel_pid(Tab, Channel) ->
+    [#channel{pid=Pid}] = ets:lookup(Tab, Channel),
+    Pid.
+
+create_missing_channels(State) ->
+    create_missing_channels(State#state.tab, State#state.db_channels),
+    create_missing_channels(State#state.tab, State#state.view_channels),
+    create_missing_channels(State#state.tab, State#state.schema_channels),
+    State.
+
+create_missing_channels(_Tab, []) ->
+    ok;
+create_missing_channels(Tab, [Channel|Rest]) ->
+    case ets:lookup(Tab, Channel) of
+        [] ->
+            {ok, Pid} = smoosh_channel:start_link(Channel),
+            true = ets:insert(Tab, [#channel{name=Channel, pid=Pid}]);
+        _ ->
+            ok
+    end,
+    create_missing_channels(Tab, Rest).
+
+get_priority(Channel, {Shard, GroupId}) ->
+    case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
+    {ok, Pid} ->
+        try
+            {ok, ViewInfo} = couch_index:get_info(Pid),
+            {SizeInfo} = couch_util:get_value(sizes, ViewInfo),
+            DiskSize = couch_util:get_value(file, SizeInfo),
+            ActiveSize = couch_util:get_value(active, SizeInfo),
+            NeedsUpgrade = needs_upgrade(ViewInfo),
+            get_priority(Channel, DiskSize, ActiveSize, NeedsUpgrade)
+        catch
+            exit:{timeout, _} ->
+                0
+        end;
+    {not_found, _Reason} ->
+        0;
+    {error, Reason} ->
+        couch_log:warning("Failed to get group_pid for ~p ~p ~p: ~p",
+            [Channel, Shard, GroupId, Reason]),
+        0
+    end;
+
+get_priority(Channel, {schema, DbName, DDocId}) ->
+    case couch_md_index_manager:get_group_pid(DbName, DDocId) of
+    {ok, Pid} ->
+        {ok, SchemaInfo} = couch_md_index:get_info(Pid),
+        DiskSize = couch_util:get_value(disk_size, SchemaInfo),
+        DataSize = couch_util:get_value(data_size, SchemaInfo),
+        get_priority(Channel, DiskSize, DataSize, false);
+    {error, Reason} ->
+        couch_log:warning("Failed to get group_pid for ~p ~p ~p: ~p",
+            [Channel, DbName, DDocId, Reason]),
+        0
+    end;
+
+get_priority(Channel, DbName) when is_list(DbName) ->
+    get_priority(Channel, ?l2b(DbName));
+get_priority(Channel, DbName) when is_binary(DbName) ->
+    {ok, Db} = couch_db:open_int(DbName, []),
+    try get_priority(Channel, Db) after couch_db:close(Db) end;
+get_priority(Channel, Db) ->
+    {ok, DocInfo} = couch_db:get_db_info(Db),
+    {SizeInfo} = couch_util:get_value(sizes, DocInfo),
+    DiskSize = couch_util:get_value(file, SizeInfo),
+    ActiveSize = couch_util:get_value(active, SizeInfo),
+    NeedsUpgrade = needs_upgrade(DocInfo),
+    case db_changed(Channel, DocInfo) of
+        true  -> get_priority(Channel, DiskSize, ActiveSize, NeedsUpgrade);
+        false -> 0
+    end.
+
+get_priority(Channel, DiskSize, DataSize, NeedsUpgrade) ->
+    Priority = get_priority(Channel),
+    MinSize = to_number(Channel, "min_size", "1048576"),
+    MaxSize = to_number(Channel, "max_size", "infinity"),
+    DefaultMinPriority = case Priority of "slack" -> "16777216"; _ -> "5.0" end,
+    MinPriority = to_number(Channel, "min_priority", DefaultMinPriority),
+    MaxPriority = to_number(Channel, "max_priority", "infinity"),
+    if Priority =:= "upgrade", NeedsUpgrade ->
+            1;
+       DiskSize =< MinSize ->
+            0;
+       DiskSize > MaxSize ->
+            0;
+       DataSize =:= 0 ->
+            MinPriority;
+       Priority =:= "ratio", DiskSize/DataSize =< MinPriority ->
+            0;
+       Priority =:= "ratio", DiskSize/DataSize > MaxPriority ->
+            0;
+       Priority =:= "ratio" ->
+            DiskSize/DataSize;
+       Priority =:= "slack", DiskSize-DataSize =< MinPriority ->
+            0;
+       Priority =:= "slack", DiskSize-DataSize > MaxPriority ->
+            0;
+       Priority =:= "slack" ->
+            DiskSize-DataSize;
+       true ->
+            0
+    end.
+
+db_changed(Channel, Info) ->
+    case couch_util:get_value(compacted_seq, Info) of
+        undefined ->
+            true;
+        CompactedSeq ->
+            MinChanges = list_to_integer(
+                smoosh_utils:get(Channel, "min_changes", "0")),
+            UpdateSeq = couch_util:get_value(update_seq, Info),
+            UpdateSeq - CompactedSeq >= MinChanges
+    end.
+
+to_number(Channel, Name, Default) ->
+    case smoosh_utils:get(Channel, Name, Default) of
+        "infinity" -> infinity;
+        Value ->
+            try
+                list_to_float(Value)
+            catch error:badarg ->
+                list_to_integer(Value)
+            end
+    end.
+
+get_priority("ratio_dbs") ->
+    "ratio";
+get_priority("ratio_views") ->
+    "ratio";
+get_priority("ratio_schemas") ->
+    "ratio";
+get_priority("slack_dbs") ->
+    "slack";
+get_priority("slack_views") ->
+    "slack";
+get_priority("slack_schemas") ->
+    "slack";
+get_priority("upgrade_dbs") ->
+    "upgrade";
+get_priority("upgrade_views") ->
+    "upgrade";
+get_priority(Channel) ->
+    smoosh_utils:get(Channel, "priority", "ratio").
+
+needs_upgrade(Props) ->
+    DiskVersion = couch_util:get_value(disk_format_version, Props),
+    case couch_util:get_value(engine, Props) of
+        couch_bt_engine ->
+            (couch_bt_engine_header:latest(DiskVersion) =:= false);
+        _ ->
+            false
+    end.
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+setup() ->
+    meck:new([config, couch_index, couch_index_server], [passthrough]),
+    Pid = list_to_pid("<0.0.0>"),
+    meck:expect(couch_index_server, get_index, 3, {ok, Pid}),
+    meck:expect(config, get, fun(_, _, Default) -> Default end),
+    Shard = <<"shards/00000000-1fffffff/test.1529510412">>,
+    GroupId = <<"_design/ddoc">>,
+    {ok, Shard, GroupId}.
+
+
+teardown(_) ->
+    meck:unload().
+
+
+get_priority_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            fun t_ratio_view/1,
+            fun t_slack_view/1,
+            fun t_no_data_view/1,
+            fun t_below_min_priority_view/1,
+            fun t_below_min_size_view/1,
+            fun t_timeout_view/1,
+            fun t_missing_view/1,
+            fun t_invalid_view/1
+        ]
+}.
+
+
+t_ratio_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            {ok, [{sizes, {[{file, 5242880}, {active, 524288}]}}]}
+        end),
+        ?assertEqual(10.0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_slack_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            {ok, [{sizes, {[{file, 33554432}, {active, 16777215}]}}]}
+        end),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(16777217, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_no_data_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            {ok, [{sizes, {[{file, 5242880}, {active, 0}]}}]}
+        end),
+        ?assertEqual(5.0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(16777216, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(5.0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_below_min_priority_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            {ok, [{sizes, {[{file, 5242880}, {active, 1048576}]}}]}
+        end),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_below_min_size_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            {ok, [{sizes, {[{file, 1048576}, {active, 512000}]}}]}
+        end),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_timeout_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            exit({timeout, get_info})
+        end),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_missing_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index_server, get_index, 3, {not_found, missing}),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_invalid_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index_server, get_index, 3, {error, undef}),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+
+-endif.
diff --git a/src/smoosh_sup.erl b/src/smoosh_sup.erl
new file mode 100644
index 0000000..158498c
--- /dev/null
+++ b/src/smoosh_sup.erl
@@ -0,0 +1,38 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+    {ok, { {one_for_one, 5, 10}, [?CHILD(smoosh_server, worker)]} }.
diff --git a/src/smoosh_utils.erl b/src/smoosh_utils.erl
new file mode 100644
index 0000000..78ef83a
--- /dev/null
+++ b/src/smoosh_utils.erl
@@ -0,0 +1,58 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_utils).
+-include_lib("couch/include/couch_db.hrl").
+
+-export([get/2, get/3, group_pid/1, split/1, stringify/1, ignore_db/1]).
+
+group_pid({Shard, GroupId}) ->
+    case couch_view_group:open_db_group(Shard, GroupId) of
+    {ok, Group} ->
+        try
+            gen_server:call(couch_view, {get_group_server, Shard, Group})
+        catch _:Error ->
+            {error, Error}
+        end;
+    Else ->
+        Else
+    end.
+
+get(Channel, Key) ->
+    ?MODULE:get(Channel, Key, undefined).
+
+get(Channel, Key, Default) ->
+    config:get("smoosh." ++ Channel, Key, Default).
+
+split(CSV) ->
+    re:split(CSV, "\\s*,\\s*", [{return,list}, trim]).
+
+stringify({DbName, GroupId}) ->
+    io_lib:format("~s ~s", [DbName, GroupId]);
+stringify({schema, DbName, GroupId}) ->
+    io_lib:format("schema: ~s ~s", [DbName, GroupId]);
+stringify(DbName) ->
+    io_lib:format("~s", [DbName]).
+
+ignore_db({DbName, _GroupName}) ->
+    ignore_db(DbName);
+ignore_db(DbName) when is_binary(DbName)->
+    ignore_db(?b2l(DbName));
+ignore_db(DbName) when is_list(DbName) ->
+    case config:get("smoosh.ignore", DbName, false) of
+    "true" ->
+        true;
+    _ ->
+        false
+    end;
+ignore_db(Db) ->
+    ignore_db(couch_db:name(Db)).