You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2019/03/13 19:17:46 UTC

[couchdb-ioq] 17/21: Import Cloudant's full IOQ code

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 907d196c30e3602fbd90014d9297d11c3a807b7d
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Jan 30 12:17:08 2019 +0000

    Import Cloudant's full IOQ code
---
 IOQ2.md                     |  812 ++++++++++++++++++++++++++++++++
 README.md                   |   34 ++
 include/ioq.hrl             |   71 +++
 operator_guide.md           |  231 ++++++++++
 priv/stats_descriptions.cfg |  230 ++++++++++
 rebar.config                |   11 +
 src/ioq.app.src             |   11 +-
 src/ioq.erl                 |  176 ++-----
 src/ioq_app.erl             |    1 +
 src/ioq_config.erl          |  314 +++++++++++++
 src/ioq_config_listener.erl |   54 +++
 src/ioq_kv.erl              |  169 +++++++
 src/ioq_osq.erl             |  253 ++++++++++
 src/ioq_server.erl          |  608 ++++++++++++++++++++++++
 src/ioq_server2.erl         | 1068 +++++++++++++++++++++++++++++++++++++++++++
 src/ioq_sup.erl             |   27 +-
 test/ioq_config_tests.erl   |  157 +++++++
 test/ioq_kv_tests.erl       |  149 ++++++
 test/ioq_tests.erl          |   68 +++
 19 files changed, 4310 insertions(+), 134 deletions(-)

diff --git a/IOQ2.md b/IOQ2.md
new file mode 100644
index 0000000..ddf8d22
--- /dev/null
+++ b/IOQ2.md
@@ -0,0 +1,812 @@
+# IOQ2 Overview
+
+IOQ2 is a replacement for the original IOQ, with the core motivation to create
+a faster IOQ that eliminates the need for IOQ bypasses. This is achieved with
+two primary approaches:
+
+  1. Faster data structures
+  2. More processes
+
+IOQ2 also provides a more capable configuration system allowing
+(de)prioritization at the class, user, and shard/class levels. This means you
+can do things like bumping up global compaction priority, deprioritizing a
+problematic MT user, and bumping up view build priority on an individual shard.
+
+
+## Faster Data Structures
+
+One of the issues with IOQ1 is that it uses standard Erlang list and queue data
+structures for doing high volume modifications to the request queues and
+tracking of active requests. This quickly becomes a bottleneck, and results in
+IOQ1 being roughly an order of magnitude slower than bypassing it, in high
+throughput scenarios.
+
+We work around this issue in IOQ2 by introducing the `hqueue` data structure,
+which is an Erlang NIF wrapper around a simple mutable priority queue that only
+does in place updates. This queue is minimal in functionality and only supports
+floating point values as the priority sorting mechanism. The result is that
+IOQ2 focuses on a multiplicative prioritization scheme using chained
+multipliers allowing for different weights to classes, users, and shard/class
+pairs. This prioritization scheme can easily be extended later on to include
+additional attributes, such as CCM tier, or perhaps a feedback loop providing
+prioritization on overall request volumes/cost per user.
+
+
+## More Processes
+
+The introduction of `hqueue` has a considerable impact on IOQ and more than
+doubles the throughput of IOQ1, however, as mentioned above, IOQ1 bypasses can result in
+an order of magnitude performance difference, so clearly faster data structures
+are not sufficient to solve the problem. The main issue is that an Erlang
+process can only go as fast a single CPU core will allow, so as we continually
+add more CPU cores to production systems, single process IOQ becomes even more
+of a problem.
+
+Having all requests funnel through a single Erlang process will
+inevitably become a bottleneck. That said, the entire point of IOQ is that it
+*IS* a bottleneck! If there's no queued requests, then there's nothing to
+prioritize, making the use of IOQ questionable at best. The balancing act here
+is getting as close to IOQ bypass performance as we can while inducing enough
+of a bottleneck to effectively be able to prioritize different types of work.
+
+IOQ2 uses a set of IOQ2 processes to achieve similar levels of performance as
+an IOQ bypass. It creates a set of named `ioq_server_$N` processes for each
+Erlang scheduler in the VM. The caller requests are then dispatched to the
+appropriate IOQ2 server based on the current scheduler of the caller. Overall
+this works quite well and seems to be an effective way of ensuring sufficient
+request volume to have a queue backlog to prioritize, while also spreading out
+load as the Erlang spreads load out across more schedulers, as needed. There is
+an experimental option to bind the IOQ2 pids to the relevant schedulers, but so
+far this has not shown conclusive improvements during limited testing. Another
+potential approach here is to randomize requests to the different IOQ2 pids.
+More fine grained testing with a sharp eye on latency distributions would be
+useful here.
+
+
+# The (h)queue
+
+The new queue in IOQ2 is the `hqueue` NIF, which is a mutable heap based
+max priority queue that prioritizes on a single floating point value. What this
+means in practice is that every request gets a numeric floating point priority
+value, and is then thrown into the queue until is is popped as the max value.
+The result is a priority queue data structure that inserts new items in
+O(log(N)) and also extracts the maximum item in O(log(N)), resulting in a very
+performant data structure for the application at hand.
+
+
+## Sample Prioritizations
+
+The current prioritization scheme is a simple set of multipliers for the
+various dimensions. Currently there are three dimensions:
+
+  * Class: eg `interactive`, `db_compact`, `view_update`, etc
+  * User: the user making the request, eg `<<"foo">>`
+  * Shard/Class pair: the shard and class for the request
+    - eg `{<<"shards/00000000-1fffffff/foo">>, interactive}`
+    - this allows for things like increased compaction priority on an
+      individual shard outside of the global class multipliers
+
+Behind the scenes, this basically works as follows:
+
+```erlang
+prioritize_request(Req) ->
+  UserPriority = user_priority(Req),
+  ClassPriority = class_priority(Req),
+  ShardClassPriority = shard_class_priority(Req),
+
+  UserPriority * ClassPriority * ShardClassPriority.
+```
+
+With the default priority being the identity priority, 1.0, so in the case
+where no values are defined the multiplier above would be 1.0 * 1.0 * 1.0. The
+default class priorities are currently defined as follows in `ioq.hrl`:
+
+
+### Default Class Priorities
+
+```erlang
+-define(DEFAULT_CLASS_PRIORITIES, [
+    {customer, 1.0},
+    {internal_repl, 0.001},
+    {view_compact, 0.0001},
+    {db_compact, 0.0001},
+    {low, 0.0001},
+    {db_meta, 1.0},
+
+    {db_update, 1.0},
+    {view_update, 1.0},
+    {other, 1.0},
+    {interactive, 1.0}
+]).
+```
+
+
+## Handling Priority Starvation
+
+One potential problem with the simple floating point based priority queue is
+that lower priority items can become starved given a constant volume of higher
+priority requests. We want to ensure that requests can't get permanently
+starved in this manner, and that work progresses on all fronts in a timely
+fashion. IOQ1 handles this issue by ensuring there's always a random chance low
+priority work will be executed.
+
+In IOQ2 this issue is handled by way of an auto scaling elevator on the
+priority values. What this means is that every `N` requests, IOQ2 will scale
+the existing queued items by a configurable scaling factor. The idea is that
+you automatically increase the priority of all queued items, and if you do that
+enough times then lower priority items will eventually bubble up to the top.
+Behind the scenes hqueue is an array based heap so we can easily run through
+the array and update the priority of each item. By scaling the priority of each
+item linearly, we preserve the loop invariant sorted order of the elements in
+the heap and can accomplish this without needing to resort the heap.
+
+The default scale factor is currently `2.0`, and the default `N` value for how
+often to scale is currently every `1000` requests. Both of these values are
+config knobs. In general these values seem _ok_, but they're not particularly
+scientific, so we'll want to keep an eye on them over time in a variety of
+workloads.
+
+
+## Intentional Priority Starvation
+
+Initially, IOQ2 and hqueue required all priorities to be greater than zero, but
+this has been switched to be greater than or equal to zero. The motivation here
+is that 0.0 has the cute property of propagating through any multipliers. This
+means a zero value for any of the dimensions will make the other dimensions
+irrelevant. But what's even more interesting is that zero priority values skip
+the auto scaling elevator and will forever be stuck at zero, which provides a
+way to intentionally starve particular work types, or at the very least to
+ensure that it will never be selected unless there is no other work to do. This
+is especially useful for black balling problematic MT users, or marking a
+particular database as background only work.
+
+
+---
+
+
+# IOQ2 Operations Guide
+
+IOQ2 comes with a feature toggle, and is disabled by default. You can enable it
+with:
+
+
+## Enable IOQ2
+
+```erlang
+ioq_config:set_enabled(true, "Enabling IOQ2").
+```
+
+You can verify it's enabled by checking:
+
+```erlang
+ioq:ioq2_enabled().
+```
+
+
+## Metrics Dashboard
+
+IOQ2 has a dedicated dashboard page on `metrics.cloudant.com` with the
+expected tab name of `IOQ2`.
+
+
+---
+
+
+## Choosing Good Priority Values
+
+Choosing good priority values is going to be a crucial part of tuning IOQ2.
+Unfortunately this is not particularly obvious nor necessarily easy, and it will
+take some experimentation under different workloads to begin establishing some
+best practices. Hopefully folks can start updating this section with some
+useful tips and tricks for different workloads. Below there's more
+documentation on how to validate the priority configs for different request
+types. The primary motivation for adding that logic was to help facilitate
+experimentation of different configuration options, and to aid in understanding
+how the different configurations impact request prioritization.
+
+It will be useful to keep in mind the ranges of priority values. By default all
+interactive/view_update/db_update/other class requests have a priority of
+`1.0`, and assuming no user specific or shard specific configs, those requests
+will have a priority of `1.0`. Similarly, standard background tasks like
+compaction and internal replication have a default priority of `0.0001`. So
+primary database operations by default have a thousand fold prioritization over
+background tasks. The default bounds of prioritization are from `0.0` to
+`10000.0`, so you have a decent field to experiment with, and the upper bound
+can be configured higher or lower as desired.
+
+It's also important to remember the auto scaling elevator logic for prioritized
+requests. Every `N` requests all currently queued requests have their
+priorities linearly scaled, so after sufficiently long time in the queue, all
+requests (with non `0.0` priorities) will eventually become the top priority
+(assuming constant priorities coming in). The scaling factor can be configured
+as well as how often to do the scaling.
+
+So let's look at some real world scenarios where you would want to change
+prioritization. Let's start with a simple one, what to do when a node is at 95%
+disk space? This is an easy one! just blast compaction priority. Unlike IOQ1,
+IOQ2 does not differeniate between request types in terms of selecting next
+work, it's strictly based on the priority multiplier, so you can completely
+prioritize compaction traffic over standard database operations, potentially to
+the detriment of standard operations performance. So if you set the compaction
+priority multiplier to 10000 you'll prioritize compaction work above everything
+else (assuming default priorities elsewhere). This means that as long as there
+is compaction jobs in the queue those will be handled before anything else.
+This should be a significant win for prioritizing compaction in low disk space
+scenarios.
+
+Now, let's look at a similar, albeit more complicated scenario. Disk space is
+above 85%, and you want to get out ahead of the compaction curve without
+severely impacting cluster performance for standard operations. Cranking
+compaction to the limit will get the job done, but it will also potentially
+induce performance issues for the customer and could starve normal requests.
+Here you would want to experiment a bit with gradually bumping up the
+compaction priority. Try going from the `0.0001` default to `0.001` and see how
+much that increases compaction throughput. Still not enough? try `0.01` and
+repeat. Then on to `0.1` and maybe even on to `1.0` to make compaction priority
+level with normal operations.
+
+One other thing to keep in mind here is that these prioritizations are also
+dependent on the overall request volumes. If you've got 10k pending db_update
+requests, and only 5 pending compaction requests, then cranking compaction
+priority is not going to have a massive negative impact on db_update
+throughput. Similarly, if you've only got one compaction job running, you'll
+run into diminishing returns for how effectively you can prioritize compaction
+as there's insufficient request volume to prioritize. You'll need to experiment
+with increasing Smoosh concurrency to get more jobs running to have more queued
+items to prioritize.
+
+
+## Setting Priority Values
+
+IOQ2 contains utility functions for setting all configuration related values;
+you should *not* need to use `config:set` for any IOQ2 related configuration
+changes. In addition to the various config tunables, there are builtin helpers
+to assist with setting appropriate priorities. All priority values should be
+floating point values, and the described config helpers will prevent you from
+using non floating point values. All of the config helpers here expect a
+`Reason` value to log changes as per standard auditing rules.
+
+*NOTE* the shard priorities do not include the shard suffix in the config names
+to preserve priorities between cycled dbs, so if for whatever reason you
+manually set shard priorities, make sure you use `filename:rootname(ShardName)`
+to drop the suffix so that your config options work as expected.
+
+
+### Setting Class Priority Values
+
+Class specific priority multipliers can be set as demonstrated below. The class
+name should be an Erlang atom, and the value should be a float. For example:
+
+```erlang
+ioq_config:set_class_config(interactive, 3.2, "Insightful motivation").
+ioq_config:set_class_config(db_compact, 0.5, "Moar compactions").
+```
+
+
+### Setting Shard/Class Specific Priority Values
+
+You can prioritize on Shard/Class pairs, there is no shard wide prioritization
+so you'll need to set each class as appropriate. This function takes a
+`#shard{}` record as returned by `mem3:shards`.
+
+```erlang
+Shard = hd(mem3:shards(DbName)),
+ioq_config:set_shard_config(Shard, db_update, 2.3, "Prioritize db updates").
+```
+
+You _could_ call `set_shard_config` for every shard for a given database, but
+there's a helper for that as well:
+
+
+### Setting Shard/Class Priority for all Shards in a Database
+
+There's a helper function for setting a class priority on all shards for a
+given database name. Similarly to the shard/class configs, you have to specify
+each class priority individually. You can use it as follows:
+
+```erlang
+ioq_config:set_db_config(<<"foo/bar">>, view_update, 0.8, "Build that view").
+```
+
+This is roughly equivalent to:
+
+```erlang
+[set_shard_config(S, Class, Value, Reason) || S <- mem3:shards(DbName)].
+```
+
+
+### Setting User Specific Priority Values
+
+You can set a global multiplier for a particular user to increase or decrease
+the priority of all of their requests. Here's an example:
+
+```erlang
+ioq_config:set_user_config(<<"foo">>, 3.7, "Priority user").
+```
+
+The `set_user_config` currently does not validate that the user exists, so
+you'll want to validate you set the config for the appropriate user.
+
+
+### Verifying Expected Prioritization Matches up with Reality
+
+Ok great, so you've just used the handy helpers to set varius priority values,
+but how do you verify it did what you expect? How do you test to see that the
+multipliers result in a prioritization in the desired range? The prioritization
+logic is self contained and can easily be experimented with. You have two
+options, either with the `check_priority/3` function, or by using the
+`prioritize` function directly. The `check_priority` function is the simple
+approach, and precludes you from having to build up the relevant priority data
+structures. It can be used as follows:
+
+```erlang
+User = <<"foo">>,
+DbName = <<"foo/bar">>,
+Shard = hd(mem3:shards(DbName)),
+ioq_config:check_priority(internal_repl, User, Shard).
+```
+
+That will return the floating point prioritization value for that request. You
+can also experiment with your own config options directly by way of using the
+`ioq_config:prioritize` function. To demonstrate an example of using this,
+here is the source code for the `check_priority` function used above:
+
+```erlang
+-spec check_priority(atom(), binary(), binary()) -> float().
+check_priority(Class, User, Shard0) ->
+    {ok, ClassP} = build_class_priorities(),
+    {ok, UserP} = build_user_priorities(),
+    {ok, ShardP} = build_shard_priorities(),
+
+    Shard = filename:rootname(Shard0),
+    Req = #ioq_request{
+        user = User,
+        shard = Shard,
+        class = Class
+    },
+
+    prioritize(Req, ClassP, UserP, ShardP).
+```
+
+The `build_*_priorities()` functions are all exported from the `ioq_config`
+module and are directly usable for easy testing. You can also see the full list
+of priority values from those priority data structures like so:
+
+```erlang
+(node1@127.0.0.1)14> khash:to_list(ShardP).
+[{{<<"shards/00000000-1fffffff/foo">>,interactive},1.0e3},
+ {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update},
+   1.5}]
+```
+
+
+---
+
+
+## Other Configuration Knobs
+
+There are a number of other configuration knobs availabe and they're detailed
+below.
+
+
+### IOQ2 Concurrency
+
+We'll start out with one of the more awkward configuration tunables:
+concurrency! This option is awkward because it fundamentally changes the
+dynamics of IOQ, both IOQ1 and IOQ2. If concurrency is higher than the number
+of parallel requests, then you'll never actually prioritize things and using
+IOQ is a waste. If it's too low then you'll overly bottleneck the system and
+cause a backup of requests.
+
+This awkwardness is further compounded by the fact that IOQ2 is inherently
+concurrent in that it has one IOQ2 pid per scheduler, so you must exercise
+caution with setting the concurrency value! This value is propagated to every
+IOQ2 pid, so setting concurrency is essentially multiplicative, and total
+concurrency will be `concurrency * num_schedulers`. Most of our newer systems
+now have 48 cores, and we have have systems with 56 cores now, so setting IOQ2
+concurrency to 5 could result in a total concurrency of 250+!!! The
+`ioq:get_disk_concurrency()` function (which calls
+`ioq_server2:get_concurrency()` when IOQ2 is enabled) will aggregate these
+concurrency values together, giving you the full total, so that's useful to
+double check.
+
+Interestingly enough, the best concurrency value found so far through emperical
+means is concurrency=1 per IOQ2 pid! This is the current default, and so on
+machines with 48 cores we end up with a total concurrency of 49. So far the
+default of one has been fairly effective, but given sufficient volume of
+requests it might be worthwhile to bump it up. Start small and trying bumping
+it up to two, or maybe three. For example:
+
+```erlang
+ioq_config:set_concurrency(2, "Bumping concurrency").
+```
+
+*NOTE* if you do feel the need to update concurrency, please do notify
+@chewbranca afterwards so we can observe this in more workloads.
+
+
+### IOQ2 Resize Limit
+
+The resize limit value controls how many requests to handle before triggering
+the auto scaling elevator logic described above. This defaults to 1000, and can
+be changed with:
+
+```erlang
+ioq_config:set_resize_limit(5000, "Huge resize limit test").
+```
+
+*NOTE* if you do feel the need to update resize_limit, please do notify
+@chewbranca afterwards so we can observe this in more workloads.
+
+
+### IOQ2 Scale Factor
+
+The scale factor defines the multiplier to use during auto scaling when the
+resize limit is hit. This currently defaults to two, which means every ten
+auto scale iterations you'll have increased the priority one thousand fold for
+any requests that have been in the queue for all ten cycles. This value may or
+may not be too aggressive. Setting this to one essentially eliminates the auto
+scaling elevator logic entirely, which is not really recommended. You can
+update it as follows:
+
+```erlang
+ioq_config:set_scale_factor(1.7, "Modifying scale factor").
+```
+
+*NOTE* if you do feel the need to update scale_factor, please do notify
+@chewbranca afterwards so we can observe this in more workloads.
+
+
+### IOQ2 Max Priority
+
+Max priority establishes an upper bound on the priority values. It currently
+defaults to 10000.0. There is also an implicit lower bound on priority values
+of 0.0. Depending on how wild you go with the multipliers, it might be useful
+to increase this value, which can be done with the following:
+
+```erlang
+ioq_config:set_max_priority(55555.0, "Expand priority space").
+```
+
+### IOQ2 Dedupe
+
+Both IOQ1 and IOQ2 have a dedupe feature that will avoid performing the same
+read multiple times in parallel. In IOQ1 this operation scanned through lists
+and could become a considerable resource hog. In IOQ2 this is a simple khash
+lookup and should not be a problem. You should *not* need to ever disable this.
+For whatever reason if you need to, you can do so with:
+
+```erlang
+ioq_config:set_dedupe(false, "Disable dedupe test").
+```
+
+*NOTE* if you do feel the need to update dedupe, please do notify
+@chewbranca afterwards so we can observe this in more workloads.
+
+
+### IOQ2 Bypass
+
+IOQ2 has the same bypass logic as IOQ1, however, the whole point of IOQ2 is to
+make a sufficiently performant IOQ that bypasses are *not* necessary. This
+functionality was included in IOQ2 as a backup in case max throughput is
+essential and unreachable with IOQ2. You can set it in the standard manner, but
+in the `ioq2.bypass` namespace as follows:
+
+```erlang
+ioq_config:set_bypass(interactive, true, "Bypass interactive channel").
+```
+
+*NOTE* if you do feel the need to bypass IOQ2, please do notify
+@chewbranca afterwards so we can observe this in more workloads. Yes, this
+NOTE blurb is in a number of config descriptions, but _please_ do notify
+@chewbranca if you feel the need to bypass anything.
+
+
+### IOQ2 Dispatch Strategy
+
+IOQ2 utilizes many processes to achieve the desired throughput and performance.
+There are several different dispatch strategies for determining how requests are
+funneled through these IOQ pids, and a `single_server` fallback in the event
+only a single IOQ2 server is desired. Changing dispatch strategies is a *safe*
+operation to perform, all the pids already exist and it will just toggle which
+to go through. All active requests will continue to go through the IOQ2 pid they
+were initially handled by, and any new requests will go through the specified
+IOQ2. The four current dispatch strategies are:
+
+  * "server_per_scheduler"
+  * "fd_hash"
+  * "random"
+  * "single_server"
+
+
+### Server per Scheduler Dispatch Strategy
+
+```erlang
+ioq_config:set_dispatch_strategy("server_per_scheduler", "Changing dispatch)).
+```
+
+This is the default dispatch strategy. IOQ2 creates `N` `ioq_server2` pids, where
+`N` is the number of Erlang VM schedulers on the current system, which defaults
+to the number of CPU Cores. This dispatch strategy uses the the current
+scheduler of the caller process to determine which IOQ2 server to use. This has
+the nice property of automatically distributing work out across IOQ2 servers
+based on how the Erlang VM is spreading out work across the schedulers. In
+practice this works pretty well and seems reasonable at a high level, but it may
+or may not be optimal for all workloads, which is why we have multiple dispatch
+strategies.
+
+
+### FD Hash Scheduler Dispatch Strategy
+
+```erlang
+ioq_config:set_dispatch_strategy("fd_hash", "Changing dispatch)).
+```
+
+The `fd_hash` dispatch strategy hashes on the couch_file pid the request has as
+a destination, and then ensures that all requests to the same couch_file pid go
+through a single IOQ2 pid. This provides the most control over prioritization of
+requests to individual shards, as _all_ requests to that shard will go through
+the single IOQ2 pid, providing global prioritization rather than localized by
+IOQ2 pid. This can be useful when dealing with overloaded couch_file pids where
+you want to minimize and focus work send to those pids. Also, by funneling all
+reuqests to the same shard through the same IOQ2 pid, this increases the
+opportunity for deduping requests, which can be significant. This dispatch
+strategy can result in uneven distribution of work across IOQ2 pids, so it's not
+appropriate for all situations, but for many dedicated clusters this could be an
+ideal dispatch strategy.
+
+
+### Random Dispatch Strategy
+
+
+```erlang
+ioq_config:set_dispatch_strategy("random", "Changing dispatch)).
+```
+
+The `random` dispatch strategy just randomly selects one of the IOQ2 pids to
+send the request to. This dispatch strategy uses a random normal distribution
+and should result in roughly even work distributed across all IOQ2 pids. This is
+not the default strategy because if there's less concurrent requests active in
+the system than total IOQ2 pids, there will not actually be any prioritization
+taking place, in which case the `server_per_scheduler` dispatch strategy should
+be preferred as it will reduce the number of IOQ2 pids in use as a function of
+how much work is on the system.
+
+
+### Single Server Dispatch Strategy
+
+```erlang
+ioq_config:set_dispatch_strategy("random", "Changing dispatch)).
+```
+
+This is a fallback dispatch strategy that may or may not be removed at some
+point. This utilizes a single IOQ2 pid for _all_ requests, eliminating the
+benefits of parallel IOQ2 pids and inevitably resulting in IOQ2 becoming a
+bottleneck in the same way as IOQ1, albeit a faster bottleneck.
+
+
+---
+
+
+## Finding the pids
+
+The IOQ2 pids per scheduler have registered names of the form `ioq_server_$N`
+where `$N` is the scheduler id, starting from 1. You can get a list of all the
+IOQ2 pids on the current system with the following:
+
+```erlang
+(node1@127.0.0.1)10> ioq_sup:ioq_server_pids().
+[ioq_server_1,ioq_server_2]
+```
+
+
+## ioq_config:ioq_classes
+
+You can see the proper names for all registered IOQ classes with the following:
+
+```erlang
+(node1@127.0.0.1)15> ioq_config:ioq_classes().
+[customer,internal_repl,view_compact,db_compact,low,db_meta,
+ db_update,view_update,other,interactive]
+```
+
+
+## ioq_server2:get_state
+
+You can see a human readable representation of the IOQ2 server state with the
+following block of code. The output is "human readable" in that the khash and
+hqueue data structures have been transformed into lists so the contents can be
+viewed. This fetches the state of the `ioq_server_1` pid. If you want a
+different pid you'll need to manually `gen_server:call` into it.
+
+```erlang
+(node1@127.0.0.1)16> ioq_server2:get_state().
+{state,[],[],[],1,0,
+       [{view_update,1.0},
+        {view_compact,0.0001},
+        {db_compact,0.0001},
+        {low,0.0001},
+        {db_update,1.0},
+        {customer,1.0},
+        {internal_repl,0.001},
+        {interactive,1.0},
+        {other,1.0},
+        {db_meta,1.0}],
+       [],
+       [{{<<"shards/00000000-1fffffff/foo">>,interactive},1.0e3},
+        {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update},
+         1.5}],
+       2.0,true,1000,1,ioq_server_1,0,normal,1.0e4}
+```
+
+If you want to have the pretty printed version and be able to fetch the fields
+directly, you'll need to include the `ioq_server2` records, for example:
+
+```erlang
+(node1@127.0.0.1)17> rr(ioq_server2), ioq_server2:get_state().
+#state{reqs = [],waiters = [],queue = [],concurrency = 1,
+       iterations = 0,
+       class_p = [{view_update,1.0},
+                  {view_compact,0.0001},
+                  {db_compact,0.0001},
+                  {low,0.0001},
+                  {db_update,1.0},
+                  {customer,1.0},
+                  {internal_repl,0.001},
+                  {interactive,1.0},
+                  {other,1.0},
+                  {db_meta,1.0}],
+       user_p = [],
+       shard_p = [{{<<"shards/00000000-1fffffff/foo">>,interactive},
+                   1.0e3},
+                  {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update},
+                   1.5}],
+       scale_factor = 2.0,dedupe = true,resize_limit = 1000,
+       next_key = 1,server_name = ioq_server_1,scheduler_id = 1,
+       collect_stats = normal,max_priority = 1.0e4}
+```
+
+To fetch the server state from a particular IOQ2 pid, you can do so with the
+following:
+
+```erlang
+(node1@127.0.0.1)18> gen_server:call(ioq_server_2, get_state).
+#state{reqs = [],waiters = [],queue = [],concurrency = 1,
+       iterations = 0,
+       class_p = [{view_update,1.0},
+                  {view_compact,0.0001},
+                  {db_compact,0.0001},
+                  {low,0.0001},
+                  {db_update,1.0},
+                  {customer,1.0},
+                  {internal_repl,0.001},
+                  {interactive,1.0},
+                  {other,1.0},
+                  {db_meta,1.0}],
+       user_p = [],
+       shard_p = [{{<<"shards/00000000-1fffffff/foo">>,interactive},
+                   1.0e3},
+                  {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update},
+                   1.5}],
+       scale_factor = 2.0,dedupe = true,resize_limit = 1000,
+       next_key = 1,server_name = ioq_server_2,scheduler_id = 2,
+       collect_stats = normal,max_priority = 1.0e4}
+```
+
+
+---
+
+
+## Gotchas
+
+Some miscellaneous "gotchas" to be aware of.
+
+
+### Shard Class Configs Drop Suffixes
+
+By default the shard names include the db file suffix which is the timestamp of
+creation time. These suffixes must be dropped from the config entries,
+otherwise they will not be picked up in the IOQ2 config. For instance, here's
+what the default name looks like, followed by properly truncating the suffix:
+
+```erlang
+(node1@127.0.0.1)21> S#shard.name.
+<<"shards/00000000-1fffffff/foo.1503945430">>
+(node1@127.0.0.1)22> filename:rootname(S#shard.name).
+<<"shards/00000000-1fffffff/foo">>
+```
+
+
+### Shard Configs Persist Through Db Cycles, but not Reshards
+
+In the `Shard Class Configs Drop Suffixes` "gotcha" above, you'll see that
+suffixes are not an allowed part of the shard config keys. The motivation here
+is to allow for database configs persisting through cycles, eg deleting and
+recreating a database will preserve the config.
+
+*HOWEVER*, if you delete the database and recreate it with a new sharding
+factor, you'll end up with a completely different set of shards and the old
+configs will no longer map over and it will essentially reset to the defaults.
+This should be obvious given that the shard configs are keyed on the full shard
+name, so switching said shard name will result in a different config key.
+You'll need to manually reset the configs with the appropriate new shards once
+the database has been recreated or resharded.
+
+
+### Non integer/float Priority Configs are Ignored
+
+If you set a config value that is not an integer or floating point value, that
+configuration will be silently ignored and replaced with the default value of
+`1.0`. Check out the docs above about how to verify config expectations. The
+`ioq_config` helpers described above will only allow you to set configs with
+floating point values, so if you only use those this should never be a problem.
+However if you manually set the config values you might run into this. The
+IOQ2 config will attempt to convert integers to floats, but any other values
+will be ignored.
+
+
+### Only a subset of metrics reducers are enabled
+
+The https://metrics.cloudant.com tabs are predominantly powerd by Centinela
+reducers that aggregate the node specific metrics into an individual global
+metric. Due to the current precarious state of the metrics stack, the IOQ2 work
+has been cautious with the introduction of new metrics. Only the reduced
+metrics currently enabled on the IOQ2 tab have reducers enabled. If you need
+additional reducers you'll need to add them. For example, the iowait metrics
+are reduced on median but not P99.9:
+
+```
+(chewbranca)-(jobs:0)-(~)
+(! 14529)-> acurl
+https://cloudant.cloudant.com/metrics/couchdb.io_queue2.iowait.percentile.50
+{"_id":"couchdb.io_queue2.iowait.percentile.50","_rev":"1-e0ad9472a61b9a46ace4c9852ce63a36","reduce":true,"reducers":["mean"]}
+
+(chewbranca)-(jobs:0)-(~)
+(! 14530)-> acurl
+https://cloudant.cloudant.com/metrics/couchdb.io_queue2.iowait.percentile.999
+{"error":"not_found","reason":"missing"}
+```
+
+
+### IOQ2 Concurrency is Multiplicative
+
+This is covered in depth in the concurrency sections above, but this is an
+important enough point to warrant calling it out here as well. With IOQ1
+concurrency is singular for the one IOQ1 pid, but in IOQ2 concurrency is set
+for *every* IOQ2 pid. This means you should realistically never set concurrency
+above single digits for IOQ2. Setting concurrency to five on a system with 56
+CPU cores will result in a total concurrency of over 250, which is probably not
+productive.
+
+
+### IOQ2 Can Only Prioritize Work when there's a Backlog of Work
+
+As mentioned above, IOQ is inherently a bottleneck, otherwise it isn't actually
+able to prioritize any work. On a similar note, if there's an insufficient
+volume of a particular request type, you won't be able to significantly
+influence the throughput of that type. For instance, you can bump compaction
+priority to the moon but if there's only one compaction job you're not going to
+make a significant difference to the volume of compaction requests.
+
+The other side of this is that IOQ2 is only effective at prioritizing work when
+there's a variety of work types. If you've got a problematic MT user that you
+want to back burner, you can set the user priority to `0.0` and all their
+requests will be prioritized at `0.0` and will never benefit from the auto
+scaling elevator. That said, if there are _no_ other users making requests to
+the system, then that user's requests will still be chugging along as fast as
+they come in. IOQ2 is *NOT* a rate limiting system, it's a prioritization
+system that prioritizes requests relative to all other pending requests.
+Without sufficient work it's essentially just a pass through.
+
+
+---
+
+
+## Request for feedback
+
+IOQ and IOQ2 are complicated beasts, and there's a lot of tunable knobs here.
+It is expected that we'll need to experiment with different levels for
+different workloads, so please do be diligent about informing @chewbranca of
+situations where you've had to change the configuration options above that
+request notifications. Any other thoughts/comments/suggestions welcome as well.
+Similarly, feedback is welcome on the IOQ2 metrics tab as well.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..ccd676c
--- /dev/null
+++ b/README.md
@@ -0,0 +1,34 @@
+### IOQ classes
+The following are the list of IOQ classes:
+
+* interactive
+* db_update
+* view_update
+* db_compact
+* view_compact
+* internal_repl
+* low
+
+
+### Bypassing IOQ
+One can configure an ioq bypass, which removes an IO class from prioritization,
+as below:
+
+    config:set("ioq.bypass", "view_update", "true")
+
+Note that setting an IOQ bypass can effectively trump all other classes,
+especially in the case of an interactive bypass v. compaction. This can lead
+to high disk usage.
+
+### Setting priorities
+The priority for a class can also be set ala:
+
+    config:set("ioq", "compaction", "0.3")
+
+Or globally, using snippet/rpc:
+
+    s:set_config("ioq", "compaction", "0.314", global)
+    rpc:multicall(config, set, ["ioq", "compaction", "0.217"])
+
+As the interactive class is 'everything else' its priority cannot be directly
+set.
diff --git a/include/ioq.hrl b/include/ioq.hrl
new file mode 100644
index 0000000..55c95f8
--- /dev/null
+++ b/include/ioq.hrl
@@ -0,0 +1,71 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(DEFAULT_PRIORITY, 1.0).
+-define(BAD_MAGIC_NUM, -12341234).
+
+%% Dispatch Strategies
+-define(DISPATCH_RANDOM, "random").
+-define(DISPATCH_FD_HASH, "fd_hash").
+-define(DISPATCH_SINGLE_SERVER, "single_server").
+-define(DISPATCH_SERVER_PER_SCHEDULER, "server_per_scheduler").
+
+-define(DEFAULT_CLASS_PRIORITIES, [
+    {customer, 1.0},
+    {internal_repl, 0.001},
+    {view_compact, 0.0001},
+    {db_compact, 0.0001},
+    {low, 0.0001},
+    {db_meta, 1.0},
+
+    {db_update, 1.0},
+    {view_update, 1.0},
+    {other, 1.0},
+    {interactive, 1.0}
+]).
+
+
+-record(ioq_request, {
+    fd,
+    msg,
+    key,
+    init_priority = 1.0,
+    fin_priority,
+    ref,
+    from,
+    t0,
+    tsub,
+    shard,
+    user,
+    db,
+    class,
+    ddoc
+}).
+
+
+-type io_priority() :: db_compact
+    | db_update
+    | interactive
+    | internal_repl
+    | other
+    | customer
+    | db_meta
+    | low.
+-type view_io_priority() :: view_compact
+    | view_update.
+-type dbcopy_string() :: string(). %% "dbcopy"
+-type dbname() :: binary() | dbcopy_string().
+-type group_id() :: any().
+-type io_dimensions() :: {io_priority(), dbname()}
+    | {view_io_priority(), dbname(), group_id()}.
+-type ioq_request() :: #ioq_request{}.
+
diff --git a/operator_guide.md b/operator_guide.md
new file mode 100644
index 0000000..81acdcc
--- /dev/null
+++ b/operator_guide.md
@@ -0,0 +1,231 @@
+# An operator's guide to IOQ
+
+IOQ handles the prioritisation of IO operations in the database. It has
+two main responsibilities:
+
+ 1. Providing configurable prioritisation of interactive requests and
+    background requests such as compaction and internal replication.
+ 2. Providing equal prioritisation for interactive requests by backend/database.
+
+From an operational perspective point 1 is of most interest as it provides a set
+of levers that can be pulled to change the behaviour of the cluster in favour
+of particular workloads or operational concerns.
+
+## Basic overview
+
+From an operational point-of-view, IOQ carries out two fundamental operations:
+
+ 1. Enqueueing requests into one of a number of available channels.
+ 2. Selecting and submitting a request from the available channels according
+    to configured priorities.
+
+IOQ categorises IO requests by class and by priority. The class of a request
+dictates the channel into which it will be enqueued and the priority influences
+the probability that a given request will be dequeued and executed.
+
+The following table lists the IOQ classes and the corresponding priorities. Note
+that the mapping of IOQ classes to class priorities is not 1:1.
+
+```
+|---------------+---------------+--------------------------------------------|
+| IOQ class     | IOQ priority  | Description                                |
+|---------------+---------------+--------------------------------------------|
+| interactive   | reads, writes | IO requests related to requests made by    |
+|               |               | users via the http layer.                  |
+|               |               |                                            |
+| db_update     | writes        | Interactive IO requests which are database |
+|               |               | write operations.                          |
+|               |               |                                            |
+| view_update   | views         | IO requests related to view index builds.  |
+|               |               |                                            |
+| db_compact    | compaction    | IO requests related to database            |
+|               |               | compactions.                               |
+|               |               |                                            |
+| view_compact  | compaction    | IO requests related to view compactions.   |
+|               |               |                                            |
+| internal_repl | replication   | IO requests related to internal            |
+|               |               | replication.                               |
+|               |               |                                            |
+| low           | low           | IO requests related to requests made by    |
+|               |               | users via the http layer where the         |
+|               |               | "x-cloudant-priority: low" header is set.  |
+|               |               |                                            |
+| other         | undefined     | IO requests that do not fit any of the     |
+|               |               | above classes. This includes search IO     |
+|               |               | requests.                                  |
+|---------------+---------------+--------------------------------------------|
+```
+
+## Internals
+
+To understand the relationship between the IOQ classes and the IOQ priorities
+it is helpful to understand the channels into which IO requests are enqueued.
+
+IOQ uses the following four channels:
+
+ - `Compaction`
+ - `Internal replication`
+ - `Low`
+ - `Customer`
+
+The `Customer` channel is effectively a meta-channel where each item in the
+queue represents a backend/dbname combination that consists of a further three
+channels:
+
+ - `Interactive`
+ - `DB update`
+ - `View update`
+
+Requests are enqueued according to the following scheme:
+
+ - Requests with class `internal_repl`, `low`, `db_compact` or `view_compact`
+   are enqueued into `Internal replication`, `Low` or `Compaction` channels
+   respectively.
+ - Requests with class `interactive`, `db_update` or `view_update` are enqueued
+   into the `Interactive`, `DB update` or `View update` channel of the relevant
+   `Customer` channel for the backend/database combination.
+ - Requests with class `other` are enqueued into the `Interactive` queue of a
+   `Customer` channel reserved for `other` IOQ requests.
+
+Requests are submitted as follows:
+
+ - The next item is selected from either the `Compaction`,
+   `Internal replication`, `Low` or `Customer` channel according to the
+   configured priorities (`compaction`, `replication`, `low` and `customer`).
+ - If the item is obtained from the `Compaction`, `Internal replication` or
+   `Low` channels then the request is submitted for execution.
+ - If the item is obtained from the `Customer` channel then the request is
+   selected from either the `Interactive`, `DB update` or `View update` channel
+   according to the configured priorities (`reads`, `writes`, and `views`).
+
+## Configuration
+
+Unless there is prior knowledge of the IOQ configuration required to support the
+intended workload of a cluster on a given hardware specification it is
+recommended that IOQ is initially left with the default configuration values. As
+more becomes known about the behaviour of a cluster under load the IOQ settings
+can be tuned to provide optimal performance for the production workload.
+
+Note that tuning IOQ is not the answer to all performance problems and there are
+a finite number of gains to be had (possibly zero). You should also be
+considering the total load on the cluster, the capabilities of the underlying
+hardware and the usage patterns and design of the applications which sit on top
+of the data layer.
+
+### Priorities
+
+IOQ ships with a default configuration which gives interactive reads/writes and
+view builds a high priority (`1.0`) and the background requests a much lower
+priority (`0.001` for compaction and `0.0001` for replication and low).
+
+You can set the priorities to other values using the config app in a remsh as
+follows:
+
+    config:set("ioq", "views", "0.5", "FBXXXXX reduce views IOQ priority").
+
+To return to the default value just delete the configuration value:
+
+    config:delete("ioq", "views", "FBXXXXX revert to default priority").
+
+The following sections describe typical situations where tuning IOQ priorities
+might be appropriate.
+
+#### Internal replication backlog
+
+If cluster nodes are frequently exhibiting an internal replication backlog
+then it might be worth increasing the `replication` priority.
+
+A backlog can be confirmed by checking the following graphite target:
+
+    net.cloudant.cluster001.db*.erlang.internal_replication_jobs
+
+If this value is consistently elevated by more than a few hundred changes then
+try increasing the `replication` IOQ priority:
+
+    config:set("ioq", "replication", "0.5", "FBXXXXX speed up internal replication").
+
+If this has been effective you should notice a change in the rate at which the
+metric decreases. It is worth experimenting with values as high as `1.0` however
+you will need to keep an eye on HTTP request latencies to make sure there is no
+adverse impact on other aspects of cluster performance.
+
+#### Compactions not completing quickly enough
+
+If disk usage is rising on cluster nodes and there is a corresponding backlog
+in compaction work then it might be worth increasing the `compaction` priority.
+
+Check the volume of pending changes for ongoing compaction jobs in graphite:
+
+    net.cloudant.cluster001.db1.dbcore.active_tasks.changes_pending.*compaction
+
+Increase the priority for `compaction`:
+
+    config:set("ioq", "compaction", "0.5", "FBXXXXX speed up compaction").
+
+Now monitor the changes_pending metrics to see if the rate at which changes are
+being processed has increased.
+
+The notes in previous section apply here - experiment with values as high as
+"1.0" if necessary and keep a close eye on cluster performance whilst you
+do so.
+
+#### Interactive requests and views competing for IO resource
+
+Metrics might show that read/write performance worsens when views are building
+or conversely that view build performance slows when read/write load increases.
+If the performance requirements of the cluster are such that a particular
+type of request is more critical to the application it supports then it might be
+worth reducing the other IOQ priorities, for example:
+
+    config:set("ioq", "views", "0.1", "FBXXXXX attempt to improve read/write performance").
+
+### Concurrency
+
+The concurrency defines the total number of concurrent IO operations allowed by
+IOQ. The default value is `20` however it can be worth increasing if the
+answer to the following questions is yes:
+
+ 1. Either `net.cloudant.cluster001.db1.erlang.io_queue.active_requests` or
+    `net.cloudant.cluster001.db1.couchdb.io_queue.latency` is consistently
+    elevated.
+
+ 2. Disk utilisation is significantly less than 100%.
+
+If performance is being impacted by request waiting in the queues then it is
+worth bumping IOQ concurrency (sensible values to try are `30`, `50` and `100`)
+and observing the resulting effect.
+
+Note that increasing this value beyond a certain point can result in the disks
+being overloaded and overall performance degradation. The exact point depends
+on the cluster workload and hardware so it is very important to monitor the
+cluster when making changes here.
+
+### Bypasses
+
+In extreme cases it is possible that IOQ itself is the bottleneck for certain
+request classes. If this is case then you can bypass IOQ for that request
+class altogether, e.g. for interactive requests:
+
+    config:set("ioq.bypass", "interactive", "true", "FBXXXXX attempt to improve interactive performance").
+
+Note that bypasses are set for IOQ *classes* not IOQ priorities. This means if
+you wanted to bypass all compaction requests you would need to set a bypass for
+`db_compact` and `view_compact`.
+
+The following warnings should be heeded when considering setting an IOQ bypass:
+
+ - Other request classes will continue to be routed through IOQ so will not
+   be able to compete with the bypassed requests. You should therefore monitor
+   the cluster carefully to determine that overall performance is acceptable.
+   Keep a close eye on compaction in particular (unless it is being bypassed)
+   as if the rate of compaction slows too much the disk may start filling up.
+
+ - The bypass effectively shifts the bottleneck to another part of the system
+   which is typically evident in `couch_file` and `couch_db_updater` message
+   queue backups.
+
+ - Disk performance may also become saturated which could lead to various
+   resulting performance degradations.
+
+A good rule of thumb is to avoid IOQ bypasses altogether unless the customer
+is in immediate pain.
diff --git a/priv/stats_descriptions.cfg b/priv/stats_descriptions.cfg
new file mode 100644
index 0000000..d5b15f4
--- /dev/null
+++ b/priv/stats_descriptions.cfg
@@ -0,0 +1,230 @@
+{[couchdb, io_queue, latency], [
+    {type, histogram},
+    {desc, <<"delay introduced by routing request through IO queue">>}
+]}.
+{[couchdb, io_queue, low], [
+    {type, counter},
+    {desc, <<"number of requests routed through IO at low priority">>}
+]}.
+{[couchdb, io_queue, merged], [
+    {type, counter},
+    {desc, <<"number of requests routed through IO queue that were merged">>}
+]}.
+{[couchdb, io_queue, osproc], [
+    {type, counter},
+    {desc, <<"number of requests routed through IO os queue">>}
+]}.
+{[couchdb, io_queue, reads], [
+    {type, counter},
+    {desc, <<"number of read requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, writes], [
+    {type, counter},
+    {desc, <<"number of write requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, undefined], [
+    {type, counter},
+    {desc, <<"number of requests routed through IO queue without I/O class">>}
+]}.
+{[couchdb, io_queue, unknown], [
+    {type, counter},
+    {desc, <<"number of unknown requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, db_update], [
+    {type, counter},
+    {desc, <<"DB update requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, db_compact], [
+    {type, counter},
+    {desc, <<"DB compaction requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, view_compact], [
+    {type, counter},
+    {desc, <<"view compaction requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, view_update], [
+    {type, counter},
+    {desc, <<"view indexing requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, interactive], [
+    {type, counter},
+    {desc, <<"IO directly triggered by client requests">>}
+]}.
+{[couchdb, io_queue, internal_repl], [
+    {type, counter},
+    {desc, <<"IO related to internal replication">>}
+]}.
+{[couchdb, io_queue, other], [
+    {type, counter},
+    {desc, <<"IO related to internal replication">>}
+]}.
+{[couchdb, io_queue_bypassed, low], [
+    {type, counter},
+    {desc, <<"number of requests that bypassed IO at low priority">>}
+]}.
+{[couchdb, io_queue_bypassed, merged], [
+    {type, counter},
+    {desc, <<"number of requests that bypassed IO queue that were merged">>}
+]}.
+{[couchdb, io_queue_bypassed, osproc], [
+    {type, counter},
+    {desc, <<"number of requests that bypassed IO os queue">>}
+]}.
+{[couchdb, io_queue_bypassed, reads], [
+    {type, counter},
+    {desc, <<"number of read requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, writes], [
+    {type, counter},
+    {desc, <<"number of write requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, undefined], [
+    {type, counter},
+    {desc, <<"number of requests that bypassed IO queue without I/O class">>}
+]}.
+{[couchdb, io_queue_bypassed, unknown], [
+    {type, counter},
+    {desc, <<"number of unknown requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, db_update], [
+    {type, counter},
+    {desc, <<"DB update requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, db_compact], [
+    {type, counter},
+    {desc, <<"DB compaction requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, view_compact], [
+    {type, counter},
+    {desc, <<"view compaction requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, view_update], [
+    {type, counter},
+    {desc, <<"view indexing requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, interactive], [
+    {type, counter},
+    {desc, <<"bypassed IO directly triggered by client requests">>}
+]}.
+{[couchdb, io_queue_bypassed, internal_repl], [
+    {type, counter},
+    {desc, <<"bypassed IO related to internal replication">>}
+]}.
+{[couchdb, io_queue_bypassed, other], [
+    {type, counter},
+    {desc, <<"bypassed IO related to internal replication">>}
+]}.
+
+
+{[couchdb, io_queue2, io_errors], [
+    {type, counter},
+    {desc, <<"number of IO errors">>}
+]}.
+{[couchdb, io_queue2, merged], [
+    {type, counter},
+    {desc, <<"number of requests routed through IO queue that were merged">>}
+]}.
+
+{[couchdb, io_queue2, submit_delay], [
+    {type, histogram},
+    {desc, <<"delay introduced by routing request through IO queue">>}
+]}.
+{[couchdb, io_queue2, svctm], [
+    {type, histogram},
+    {desc, <<"time taken to service the IO request">>}
+]}.
+{[couchdb, io_queue2, iowait], [
+    {type, histogram},
+    {desc, <<"Total time request spent waiting on IO">>}
+]}.
+
+{[couchdb, io_queue2, low, count], [
+    {type, counter},
+    {desc, <<"number of requests routed through IO at low priority">>}
+]}.
+{[couchdb, io_queue2, osproc, count], [
+    {type, counter},
+    {desc, <<"number of requests routed through IO os queue">>}
+]}.
+{[couchdb, io_queue2, reads, count], [
+    {type, counter},
+    {desc, <<"number of read requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, writes, count], [
+    {type, counter},
+    {desc, <<"number of write requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, undefined, count], [
+    {type, counter},
+    {desc, <<"number of requests routed through IO queue without I/O class">>}
+]}.
+{[couchdb, io_queue2, unknown, count], [
+    {type, counter},
+    {desc, <<"number of unknown requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, db_update, count], [
+    {type, counter},
+    {desc, <<"DB update requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, db_compact, count], [
+    {type, counter},
+    {desc, <<"DB compaction requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, view_compact, count], [
+    {type, counter},
+    {desc, <<"view compaction requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, view_update, count], [
+    {type, counter},
+    {desc, <<"view indexing requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, interactive, count], [
+    {type, counter},
+    {desc, <<"IO directly triggered by client requests">>}
+]}.
+{[couchdb, io_queue2, db_meta, count], [
+    {type, counter},
+    {desc, <<"IO related to db_meta">>}
+]}.
+{[couchdb, io_queue2, internal_repl, count], [
+    {type, counter},
+    {desc, <<"IO related to internal replication">>}
+]}.
+{[couchdb, io_queue2, other, count], [
+    {type, counter},
+    {desc, <<"IO related to internal replication">>}
+]}.
+
+{[couchdb, io_queue2, bypassed_count], [
+    {type, counter},
+    {desc, <<"number of requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue2, reads, bypassed_count], [
+    {type, counter},
+    {desc, <<"number of read requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue2, writes, bypassed_count], [
+    {type, counter},
+    {desc, <<"number of write requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue2, unknown, bypassed_count], [
+    {type, counter},
+    {desc, <<"number of unknown requests that bypassed IO queue">>}
+]}.
+
+{[couchdb, io_queue2, reads, queued], [
+    {type, counter},
+    {desc, <<"number of read requests queued into IO queue">>}
+]}.
+{[couchdb, io_queue2, writes, queued], [
+    {type, counter},
+    {desc, <<"number of write requests queued into IO queue">>}
+]}.
+{[couchdb, io_queue2, unknown, queued], [
+    {type, counter},
+    {desc, <<"number of unknown requests queued into IO queue">>}
+]}.
+{[couchdb, io_queue2, queued], [
+    {type, counter},
+    {desc, <<"number of requests queued into IO">>}
+]}.
diff --git a/rebar.config b/rebar.config
new file mode 100644
index 0000000..b25fdb5
--- /dev/null
+++ b/rebar.config
@@ -0,0 +1,11 @@
+{deps, [
+    {proper, ".*", {git, "https://github.com/manopapad/proper.git", "master"}}
+]}.
+
+{eunit_opts, [
+    verbose,
+    {report, {
+        eunit_surefire, [{dir,"."}]
+    }}
+]}.
+
diff --git a/src/ioq.app.src b/src/ioq.app.src
index 65ea50d..04310bb 100644
--- a/src/ioq.app.src
+++ b/src/ioq.app.src
@@ -11,11 +11,14 @@
 % the License.
 
 {application,ioq, [
-    {description, "I/O prioritizing engine"},
+    {description, "IO request management in a multi-tenant Erlang VM"},
     {vsn, git},
     {registered,[]},
-    {applications,[kernel,stdlib,config]},
+    {applications,[kernel,stdlib,config,couch_stats,hqueue]},
     {mod,{ioq_app,[]}},
-    {env, []},
-    {modules,[ioq,ioq_app,ioq_sup]}
+    {env, [
+        {stats_db, "stats"},
+        {stats_interval, 60000}
+    ]},
+    {modules,[ioq,ioq_app,ioq_osq,ioq_server,ioq_sup]}
 ]}.
diff --git a/src/ioq.erl b/src/ioq.erl
index 9ca2656..160a448 100644
--- a/src/ioq.erl
+++ b/src/ioq.erl
@@ -11,148 +11,66 @@
 % the License.
 
 -module(ioq).
--behaviour(gen_server).
--behaviour(config_listener).
+-export([start/0, stop/0, call/3, call/4, set_disk_concurrency/1,
+    get_disk_queues/0, get_osproc_queues/0, get_osproc_requests/0,
+    get_disk_counters/0, get_disk_concurrency/0]).
+-export([
+    ioq2_enabled/0
+]).
 
--export([start_link/0, call/3]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
+-define(APPS, [config, folsom, couch_stats, ioq]).
 
-% config_listener api
--export([handle_config_change/5, handle_config_terminate/3]).
+start() ->
+    lists:foldl(fun(App, _) -> application:start(App) end, ok, ?APPS).
 
--define(RELISTEN_DELAY, 5000).
+stop() ->
+    lists:foldr(fun(App, _) -> ok = application:stop(App) end, ok, ?APPS).
 
--record(state, {
-    concurrency,
-    ratio,
-    interactive=queue:new(),
-    compaction=queue:new(),
-    running=[]
-}).
-
--record(request, {
-    fd,
-    msg,
-    priority,
-    from,
-    ref
-}).
-
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+call(Fd, Request, Arg, Priority) ->
+    call(Fd, {Request, Arg}, Priority).
 
+call(Pid, {prompt, _} = Msg, Priority) ->
+    ioq_osq:call(Pid, Msg, Priority);
+call(Pid, {data, _} = Msg, Priority) ->
+    ioq_osq:call(Pid, Msg, Priority);
 call(Fd, Msg, Priority) ->
-    Request = #request{fd=Fd, msg=Msg, priority=Priority, from=self()},
-    try
-        gen_server:call(?MODULE, Request, infinity)
-    catch
-        exit:{noproc,_} ->
-            gen_server:call(Fd, Msg, infinity)
+    case ioq2_enabled() of
+        false -> ioq_server:call(Fd, Msg, Priority);
+        true  -> ioq_server2:call(Fd, Msg, Priority)
     end.
 
-init(_) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    State = #state{},
-    {ok, read_config(State)}.
-
-read_config(State) ->
-    Ratio = list_to_float(config:get("ioq", "ratio", "0.01")),
-    Concurrency = list_to_integer(config:get("ioq", "concurrency", "10")),
-    State#state{concurrency=Concurrency, ratio=Ratio}.
-
-handle_call(#request{}=Request, From, State) ->
-    {noreply, enqueue_request(Request#request{from=From}, State), 0}.
-
-handle_cast(change, State) ->
-    {noreply, read_config(State)};
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-handle_info({Ref, Reply}, State) ->
-    case lists:keytake(Ref, #request.ref, State#state.running) of
-        {value, Request, Remaining} ->
-            erlang:demonitor(Ref, [flush]),
-            gen_server:reply(Request#request.from, Reply),
-            {noreply, State#state{running=Remaining}, 0};
-        false ->
-            {noreply, State, 0}
+set_disk_concurrency(C) when is_integer(C), C > 0 ->
+    case ioq2_enabled() of
+        false -> gen_server:call(ioq_server, {set_concurrency, C});
+        true  -> ioq_server2:set_concurrency(C)
     end;
-handle_info({'DOWN', Ref, _, _, Reason}, State) ->
-    case lists:keytake(Ref, #request.ref, State#state.running) of
-        {value, Request, Remaining} ->
-            gen_server:reply(Request#request.from, {'EXIT', Reason}),
-            {noreply, State#state{running=Remaining}, 0};
-        false ->
-            {noreply, State, 0}
-    end;
-handle_info(restart_config_listener, State) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    {noreply, State};
-handle_info(timeout, State) ->
-    {noreply, maybe_submit_request(State)}.
-
-handle_config_change("ioq", _, _, _, _) ->
-    {ok, gen_server:cast(?MODULE, change)};
-handle_config_change(_, _, _, _, _) ->
-    {ok, nil}.
+set_disk_concurrency(_) ->
+    erlang:error(badarg).
 
-handle_config_terminate(_Server, stop, _State) ->
-    ok;
-handle_config_terminate(_Server, _Reason, _State) ->
-    erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
+get_disk_concurrency() ->
+    case ioq2_enabled() of
+        false -> gen_server:call(ioq_server, get_concurrency);
+        true  -> ioq_server2:get_concurrency()
+    end.
 
-code_change(_Vsn, State, _Extra) ->
-    {ok, State}.
+get_disk_queues() ->
+    case ioq2_enabled() of
+        false -> gen_server:call(ioq_server, get_queue_depths);
+        true  -> ioq_server2:get_queue_depths()
+    end.
 
-terminate(_Reason, _State) ->
-    ok.
+get_disk_counters() ->
+    case ioq2_enabled() of
+        false -> gen_server:call(ioq_server, get_counters);
+        true  -> ioq_server2:get_counters()
+    end.
 
-enqueue_request(#request{priority={db_compact, _}}=Request, #state{}=State) ->
-    State#state{compaction=queue:in(Request, State#state.compaction)};
-enqueue_request(#request{priority={view_compact, _, _}}=Request, #state{}=State) ->
-    State#state{compaction=queue:in(Request, State#state.compaction)};
-enqueue_request(#request{}=Request, #state{}=State) ->
-    State#state{interactive=queue:in(Request, State#state.interactive)}.
+get_osproc_queues() ->
+    gen_server:call(ioq_osq, get_queue_depths).
 
-maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State)
-  when length(Running) < Concurrency ->
-    case make_next_request(State) of
-        State ->
-            State;
-        NewState when length(Running) >= Concurrency - 1 ->
-            NewState;
-        NewState ->
-            maybe_submit_request(NewState)
-    end;
-maybe_submit_request(State) ->
-    State.
+get_osproc_requests() ->
+    gen_server:call(ioq_osq, get_requests).
 
-make_next_request(#state{}=State) ->
-    case {queue:is_empty(State#state.compaction), queue:is_empty(State#state.interactive)} of
-        {true, true} ->
-            State;
-        {true, false} ->
-            choose_next_request(#state.interactive, State);
-        {false, true} ->
-            choose_next_request(#state.compaction, State);
-        {false, false} ->
-            case couch_rand:uniform() < State#state.ratio of
-                true ->
-                    choose_next_request(#state.compaction, State);
-                false ->
-                    choose_next_request(#state.interactive, State)
-            end
-    end.
-
-choose_next_request(Index, State) ->
-    case queue:out(element(Index, State)) of
-        {empty, _} ->
-            State;
-        {{value, Request}, Q} ->
-            submit_request(Request, setelement(Index, State, Q))
-    end.
+ioq2_enabled() ->
+    config:get_boolean("ioq2", "enabled", false).
 
-submit_request(#request{}=Request, #state{}=State) ->
-    Ref = erlang:monitor(process, Request#request.fd),
-    Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg},
-    State#state{running = [Request#request{ref=Ref} | State#state.running]}.
diff --git a/src/ioq_app.erl b/src/ioq_app.erl
index 2e6d75a..f24dcbb 100644
--- a/src/ioq_app.erl
+++ b/src/ioq_app.erl
@@ -15,6 +15,7 @@
 -export([start/2, stop/1]).
 
 start(_StartType, _StartArgs) ->
+    ok = ioq_kv:init(),
     ioq_sup:start_link().
 
 stop(_State) ->
diff --git a/src/ioq_config.erl b/src/ioq_config.erl
new file mode 100644
index 0000000..9cc2371
--- /dev/null
+++ b/src/ioq_config.erl
@@ -0,0 +1,314 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_config).
+
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("ioq/include/ioq.hrl").
+
+
+-export([
+    build_shard_priorities/0,
+    build_shard_priorities/1,
+    build_user_priorities/0,
+    build_user_priorities/1,
+    build_class_priorities/0,
+    build_class_priorities/1,
+    add_default_class_priorities/1,
+    to_float/1,
+    to_float/2,
+    parse_shard_string/1,
+    ioq_classes/0,
+    is_valid_class/1
+]).
+-export([
+    prioritize/4,
+    check_priority/3
+]).
+-export([
+    set_db_config/4,
+    set_shards_config/4,
+    set_shard_config/4,
+    set_class_config/3,
+    set_user_config/3
+]).
+-export([
+    set_bypass/3,
+    set_enabled/2,
+    set_max_priority/2,
+    set_dedupe/2,
+    set_scale_factor/2,
+    set_resize_limit/2,
+    set_concurrency/2,
+    set_dispatch_strategy/2
+]).
+
+
+-define(SHARD_CLASS_SEPARATOR, "||").
+-define(IOQ2_CONFIG, "ioq2").
+-define(IOQ2_BYPASS_CONFIG, "ioq2.bypass").
+-define(IOQ2_SHARDS_CONFIG, "ioq2.shards").
+-define(IOQ2_USERS_CONFIG, "ioq2.users").
+-define(IOQ2_CLASSES_CONFIG, "ioq2.classes").
+
+
+ioq_classes() ->
+    [Class || {Class, _Priority} <- ?DEFAULT_CLASS_PRIORITIES].
+
+
+set_bypass(Class, Value, Reason) when is_atom(Class), is_boolean(Value) ->
+    true = is_valid_class(Class),
+    set_config(?IOQ2_BYPASS_CONFIG, atom_to_list(Class), atom_to_list(Value), Reason).
+
+
+set_enabled(Value, Reason) when is_boolean(Value) ->
+    set_config(?IOQ2_CONFIG, "enabled", atom_to_list(Value), Reason).
+
+
+set_max_priority(Value, Reason) when is_float(Value) ->
+    set_config(?IOQ2_CONFIG, "max_priority", Value, Reason).
+
+
+set_dedupe(Value, Reason) when is_boolean(Value) ->
+    set_config(?IOQ2_CONFIG, "dedupe", atom_to_list(Value), Reason).
+
+
+set_scale_factor(Value, Reason) when is_float(Value) ->
+    set_config(?IOQ2_CONFIG, "scale_factor", float_to_list(Value), Reason).
+
+
+set_resize_limit(Value, Reason) when is_integer(Value) ->
+    set_config(?IOQ2_CONFIG, "resize_limit", integer_to_list(Value), Reason).
+
+
+set_concurrency(Value, Reason) when is_integer(Value) ->
+    set_config(?IOQ2_CONFIG, "concurrency", integer_to_list(Value), Reason).
+
+
+set_dispatch_strategy(Value, Reason) ->
+    ErrorMsg = "Dispatch strategy must be one of "
+        "random, fd_hash, server_per_scheduler, or single_server.",
+    ok = case Value of
+        ?DISPATCH_RANDOM               -> ok;
+        ?DISPATCH_FD_HASH              -> ok;
+        ?DISPATCH_SINGLE_SERVER        -> ok;
+        ?DISPATCH_SERVER_PER_SCHEDULER -> ok;
+        _                              -> throw({badarg, ErrorMsg})
+    end,
+    config:set(?IOQ2_CONFIG, "dispatch_strategy", Value, Reason).
+
+
+set_db_config(DbName, Class, Value, Reason) when is_binary(DbName) ->
+    ok = check_float_value(Value),
+    ok = set_shards_config(mem3:shards(DbName), Class, Value, Reason).
+
+
+set_shards_config(Shards, Class, Value, Reason) ->
+    ok = check_float_value(Value),
+    ok = lists:foreach(fun(Shard) ->
+        ok = set_shard_config(Shard, Class, Value, Reason)
+    end, Shards).
+
+
+set_shard_config(#shard{name=Name0}, Class0, Value, Reason) when is_atom(Class0) ->
+    ok = check_float_value(Value),
+    true = is_valid_class(Class0),
+    Name = binary_to_list(filename:rootname(Name0)),
+    Class = atom_to_list(Class0),
+    ConfigName = Name ++ ?SHARD_CLASS_SEPARATOR ++ Class,
+    ok = set_config(?IOQ2_SHARDS_CONFIG, ConfigName, Value, Reason).
+
+
+set_class_config(Class, Value, Reason) when is_atom(Class)->
+    ok = check_float_value(Value),
+    true = is_valid_class(Class),
+    ok = set_config(?IOQ2_CLASSES_CONFIG, atom_to_list(Class), Value, Reason).
+
+
+set_user_config(User, Value, Reason) when is_binary(User) ->
+    set_user_config(binary_to_list(User), Value, Reason);
+set_user_config(User, Value, Reason) ->
+    ok = check_float_value(Value),
+    %% TODO: validate User exists (how to do this without a Req?)
+    ok = set_config(?IOQ2_USERS_CONFIG, User, Value, Reason).
+
+
+is_valid_class(Class) ->
+    lists:member(Class, ioq_classes()).
+
+
+check_float_value(Value) when is_float(Value) ->
+    ok;
+check_float_value(_) ->
+    erlang:error({badarg, invalid_float_value}).
+
+
+set_config(Section, Key, Value, Reason) when is_float(Value) ->
+    set_config(Section, Key, float_to_list(Value), Reason);
+set_config(Section, Key, Value, Reason) when is_binary(Key) ->
+    set_config(Section, binary_to_list(Key), Value, Reason);
+set_config(Section, Key, Value, Reason) ->
+    ok = config:set(Section, Key, Value, Reason).
+
+
+-spec build_shard_priorities() -> {ok, khash:khash()}.
+build_shard_priorities() ->
+    Configs = lists:foldl(
+        fun({Key0, Val}, Acc) ->
+            case parse_shard_string(Key0) of
+                {error, ShardString} ->
+                    couch_log:error(
+                        "IOQ error parsing shard config: ~p",
+                        [ShardString]
+                    ),
+                    Acc;
+                Key ->
+                    [{Key, to_float(Val)} | Acc]
+            end
+        end,
+        [],
+        config:get("ioq2.shards")
+    ),
+    build_shard_priorities(Configs).
+
+
+-spec build_shard_priorities([{any(), float()}]) -> {ok, khash:khash()}.
+build_shard_priorities(Configs) ->
+    init_config_priorities(Configs).
+
+
+-spec build_user_priorities() -> {ok, khash:khash()}.
+build_user_priorities() ->
+    build_user_priorities(config:get("ioq2.users")).
+
+
+-spec build_user_priorities([{any(), float()}]) -> {ok, khash:khash()}.
+build_user_priorities(Configs0) ->
+    Configs = [{list_to_binary(K), to_float(V)} || {K,V} <- Configs0],
+    init_config_priorities(Configs).
+
+
+-spec build_class_priorities() -> {ok, khash:khash()}.
+build_class_priorities() ->
+    build_class_priorities(config:get("ioq2.classes")).
+
+
+-spec build_class_priorities([{any(), float()}]) -> {ok, khash:khash()}.
+build_class_priorities(Configs0) ->
+    {ok, ClassP} = khash:new(),
+    ok = add_default_class_priorities(ClassP),
+    Configs = [{list_to_existing_atom(K), to_float(V)} || {K,V} <- Configs0],
+    init_config_priorities(Configs, ClassP).
+
+
+-spec parse_shard_string(string()) -> {binary(), atom()}
+    | {error, string()}.
+parse_shard_string(ShardString) ->
+    case string:tokens(ShardString, ?SHARD_CLASS_SEPARATOR) of
+        [Shard, Class] ->
+            {list_to_binary(Shard), list_to_existing_atom(Class)};
+        _ ->
+            {error, ShardString}
+    end.
+
+
+-spec add_default_class_priorities(khash:khash()) -> ok.
+add_default_class_priorities(ClassP) ->
+    ok = lists:foreach(
+        fun({Class, Priority}) ->
+            ok = khash:put(ClassP, Class, Priority)
+        end,
+        ?DEFAULT_CLASS_PRIORITIES
+    ).
+
+
+-spec to_float(any()) -> float().
+to_float(V) ->
+    to_float(V, ?DEFAULT_PRIORITY).
+
+
+-spec to_float(any(), float()) -> float().
+to_float(Float, _) when is_float(Float) ->
+    Float;
+to_float(Int, _) when is_integer(Int) ->
+    float(Int);
+to_float(String, Default) when is_list(String) ->
+    try
+        list_to_float(String)
+    catch error:badarg ->
+        try
+            to_float(list_to_integer(String))
+        catch error:badarg ->
+            Default
+        end
+    end;
+to_float(_, Default) ->
+    Default.
+
+
+-spec prioritize(ioq_request(), khash:khash(), khash:khash(), khash:khash()) ->
+    float().
+prioritize(#ioq_request{} = Req, ClassP, UserP, ShardP) ->
+    #ioq_request{
+        user=User,
+        shard=Shard,
+        class=Class
+    } = Req,
+    UP = get_priority(UserP, User),
+    CP = get_priority(ClassP, Class),
+    SP = get_priority(ShardP, {Shard, Class}),
+    UP * CP * SP.
+
+
+-spec init_config_priorities([{any(), float()}]) -> {ok, khash:khash()}.
+init_config_priorities(Configs) ->
+    {ok, Hash} = khash:new(),
+    init_config_priorities(Configs, Hash).
+
+
+-spec init_config_priorities([{any(), float()}], khash:khash()) ->
+    {ok, khash:khash()}.
+init_config_priorities(Configs, Hash) ->
+    ok = lists:foreach(
+        fun({Key, Val}) ->
+            ok = khash:put(Hash, Key, Val)
+        end,
+        Configs
+    ),
+    {ok, Hash}.
+
+
+-spec check_priority(atom(), binary(), binary()) -> float().
+check_priority(Class, User, Shard0) ->
+    {ok, ClassP} = build_class_priorities(),
+    {ok, UserP} = build_user_priorities(),
+    {ok, ShardP} = build_shard_priorities(),
+
+    Shard = filename:rootname(Shard0),
+    Req = #ioq_request{
+        user = User,
+        shard = Shard,
+        class = Class
+    },
+
+    prioritize(Req, ClassP, UserP, ShardP).
+
+
+get_priority(KH, Key) ->
+    get_priority(KH, Key, ?DEFAULT_PRIORITY).
+
+
+get_priority(_KH, undefined, Default) ->
+    Default;
+get_priority(KH, Key, Default) ->
+    khash:get(KH, Key, Default).
diff --git a/src/ioq_config_listener.erl b/src/ioq_config_listener.erl
new file mode 100644
index 0000000..fc10880
--- /dev/null
+++ b/src/ioq_config_listener.erl
@@ -0,0 +1,54 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_config_listener).
+
+-vsn(2).
+-behaviour(config_listener).
+
+-export([
+    subscribe/0
+]).
+
+-export([
+    handle_config_change/5,
+    handle_config_terminate/3
+]).
+
+subscribe() ->
+    config:listen_for_changes(?MODULE, nil).
+
+handle_config_change("ioq", _Key, _Val, _Persist, St) ->
+    ok = notify_ioq_pids(),
+    {ok, St};
+handle_config_change("ioq2", _Key, _Val, _Persist, St) ->
+    ok = notify_ioq_pids(),
+    {ok, St};
+handle_config_change("ioq2."++_Type, _Key, _Val, _Persist, St) ->
+    ok = notify_ioq_pids(),
+    {ok, St};
+handle_config_change(_Sec, _Key, _Val, _Persist, St) ->
+    {ok, St}.
+
+handle_config_terminate(_, stop, _) -> ok;
+handle_config_terminate(_, _, _) ->
+    % We may have missed a change in the last five seconds
+    gen_server:cast(ioq_server, update_config),
+    spawn(fun() ->
+        timer:sleep(5000),
+        config:listen_for_changes(?MODULE, nil)
+    end).
+
+notify_ioq_pids() ->
+    ok = lists:foreach(fun(Pid) ->
+        gen_server:cast(Pid, update_config)
+    end, ioq_sup:get_ioq2_servers()).
diff --git a/src/ioq_kv.erl b/src/ioq_kv.erl
new file mode 100644
index 0000000..cf8b077
--- /dev/null
+++ b/src/ioq_kv.erl
@@ -0,0 +1,169 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% Based on Bob Ippolitto's mochiglobal.erl
+
+%%
+-module(ioq_kv).
+-export([init/0]).
+-export([all/0, get/1, get/2, put/2, delete/1]).
+-export([validate_term/1]).
+
+-define(DYNMOD, ioq_kv_dyn).
+-define(ERLFILE, "ioq_kv_dyn.erl").
+
+-spec init() -> ok.
+%% @doc Initialize the dynamic module
+init() ->
+    compile(all()).
+
+-spec all() -> [{any(), any()}].
+%% @doc Get the list of Key/Val pairs stored
+all() ->
+    try
+        ?DYNMOD:list()
+    catch error:undef ->
+        []
+    end.
+
+-spec get(any()) -> any() | undefined.
+%% @equiv get(Key, undefined)
+get(Key) ->
+    get(Key, undefined).
+
+-spec get(any(), T) -> any() | T.
+%% @doc Get the term for Key or return Default.
+get(Key, Default) ->
+    try
+        ?DYNMOD:lookup(Key, Default)
+    catch error:undef ->
+        Default
+    end.
+
+-spec put(any(), any()) -> ok.
+%% @doc Store term Val at Key, replaces an existing term if present.
+put(Key, Val) ->
+    KVs = proplists:delete(Key, all()),
+    compile([{Key, Val} | KVs]).
+
+-spec delete(any()) -> ok.
+%% @doc Delete term stored at Key, no-op if non-existent.
+delete(Key) ->
+    KVs = proplists:delete(Key, all()),
+    compile(KVs).
+
+
+compile(KVs) ->
+    Bin = compile_mod(KVs),
+    code:purge(?DYNMOD),
+    {module, ?DYNMOD} = code:load_binary(?DYNMOD, ?ERLFILE, Bin),
+    ok.
+
+
+-spec compile_mod([any()]) -> binary().
+compile_mod(KVs) ->
+    Opts = [verbose, report_errors],
+    {ok, ?DYNMOD, Bin} = compile:forms(forms(KVs), Opts),
+    Bin.
+
+
+-spec forms([any()]) -> [erl_syntax:syntaxTree()].
+forms(KVs) ->
+    validate_term(KVs),
+    Statements = [
+        module_stmt(),
+        export_stmt(),
+        list_function(KVs),
+        lookup_function(KVs)
+    ],
+    [erl_syntax:revert(X) || X <- Statements].
+
+
+-spec module_stmt() -> erl_syntax:syntaxTree().
+module_stmt() ->
+    erl_syntax:attribute(
+        erl_syntax:atom(module),
+        [erl_syntax:atom(?DYNMOD)]
+    ).
+
+-spec export_stmt() -> erl_syntax:syntaxTree().
+export_stmt() ->
+    erl_syntax:attribute(
+        erl_syntax:atom(export),
+        [erl_syntax:list([
+            erl_syntax:arity_qualifier(
+                erl_syntax:atom(list),
+                erl_syntax:integer(0)),
+            erl_syntax:arity_qualifier(
+                erl_syntax:atom(lookup),
+                erl_syntax:integer(2))
+        ])]
+    ).
+
+
+-spec list_function([any()]) -> erl_syntax:syntaxTree().
+list_function(KVs) ->
+    erl_syntax:function(
+        erl_syntax:atom(list),
+        [erl_syntax:clause([], none, [erl_syntax:abstract(KVs)])]).
+
+
+-spec lookup_function([any()]) -> erl_syntax:syntaxTree().
+lookup_function(KVs) ->
+    Clauses = lists:foldl(fun({K, V}, ClauseAcc) ->
+        Patterns = [erl_syntax:abstract(K), erl_syntax:underscore()],
+        Bodies = [erl_syntax:abstract(V)],
+        [erl_syntax:clause(Patterns, none, Bodies) | ClauseAcc]
+    end, [default_clause()], KVs),
+    erl_syntax:function(erl_syntax:atom(lookup), Clauses).
+
+
+-spec default_clause() -> erl_syntax:syntaxTree().
+default_clause() ->
+    Patterns = [erl_syntax:underscore(), erl_syntax:variable("Default")],
+    Bodies = [erl_syntax:variable("Default")],
+    erl_syntax:clause(Patterns, none, Bodies).
+
+
+-spec validate_term(any()) -> ok.
+%% @doc Validate that a term is supported. Throws invalid_term
+%% on error.
+validate_term(T) when is_list(T) ->
+    validate_list(T);
+validate_term(T) when is_tuple(T) ->
+    validate_tuple(T);
+validate_term(T) when is_bitstring(T) ->
+    case bit_size(T) rem 8 of
+        0 -> ok;
+        _ -> erlang:error(invalid_term)
+    end;
+validate_term(_T) ->
+    ok.
+
+-spec validate_list(list()) -> ok.
+validate_list([]) ->
+    ok;
+validate_list([H|T]) ->
+    validate_term(H),
+    validate_list(T).
+
+-spec validate_tuple(tuple()) -> ok.
+validate_tuple(T) ->
+    validate_tuple(T, 1, size(T)).
+
+-spec validate_tuple(tuple(), pos_integer(), pos_integer()) -> ok.
+validate_tuple(T, Pos, Size) when Pos =< Size ->
+    validate_term(element(Pos, T)),
+    validate_tuple(T, Pos+1, Size);
+validate_tuple(_, _, _) ->
+    ok.
+
diff --git a/src/ioq_osq.erl b/src/ioq_osq.erl
new file mode 100644
index 0000000..2dbfaea
--- /dev/null
+++ b/src/ioq_osq.erl
@@ -0,0 +1,253 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_osq).
+-behaviour(gen_server).
+-vsn(1).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+    code_change/3]).
+
+-export([start_link/0, call/3]).
+
+-record(channel, {
+    name,
+    q = queue:new()
+}).
+
+-record(state, {
+    reqs = [],
+    min = 2,
+    max = 6,
+    global_max = 15,
+    channels = queue:new()
+}).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% WARNING %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% This server relies on the internal structure of the channels queue as a   %%
+%% {list(), list()} to do in-place modifications of some elements.  We are   %%
+%% "running on thin ice", as it were.                                        %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+call(Pid, Msg, Priority) ->
+    Reply = gen_server:call(ioq_osq, {rlimit, nil, Priority, now()}, infinity),
+    try
+        gen_server:call(Pid, Msg, infinity)
+    after
+        whereis(ioq_osq) ! Reply
+    end.
+
+
+init([]) ->
+    ets:new(osq_counters, [named_table]),
+    St = #state{},
+    {ok, St#state{
+        min = threshold("minimum", St#state.min),
+        max = threshold("maximum", St#state.max),
+        global_max = threshold("global_maximum", St#state.global_max)
+    }}.
+
+handle_call({set_minimum, C}, _From, State) when is_integer(C), C > 0 ->
+    {reply, ok, State#state{min = C}};
+
+handle_call({set_maximum, C}, _From, State) when is_integer(C), C > 0 ->
+    {reply, ok, State#state{max = C}};
+
+handle_call({set_global_maximum, C}, _From, State) when is_integer(C), C > 0 ->
+    {reply, ok, State#state{global_max = C}};
+
+handle_call(get_queue_depths, _From, State) ->
+    Channels = [{N, queue:len(Q)} || #channel{name=N, q=Q}
+        <- queue:to_list(State#state.channels)],
+    {reply, Channels, State};
+
+handle_call(get_requests, _From, State) ->
+    {reply, State#state.reqs, State};
+
+handle_call({_, _, {interactive, Shard}, _} = Req, From, State) ->
+    {noreply, enqueue_channel(channel_name(Shard), {Req, From}, State)};
+
+handle_call({_, _, {view_update, Shard, _}, _} = Req, From, State) ->
+    {noreply, enqueue_channel(channel_name(Shard), {Req, From}, State)};
+
+handle_call({Fd, _, _, _} = Req, From, State) when is_pid(Fd) ->
+    {noreply, enqueue_channel(other, {Req, From}, State)};
+
+handle_call({rlimit, _, _, _} = Req, From, State) ->
+    {noreply, enqueue_channel(other, {Req, From}, State)};
+
+handle_call(_Msg, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({Ref, Reply}, #state{reqs = Reqs} = State) ->
+    case lists:keyfind(Ref, 3, Reqs) of
+    {_, notify, Ref} ->
+        erlang:demonitor(Ref, [flush]),
+        Reqs2 = lists:keydelete(Ref, 3, Reqs),
+        {noreply, make_next_request(State#state{reqs = Reqs2})};
+    {_, From, Ref} ->
+        erlang:demonitor(Ref, [flush]),
+        gen_server:reply(From, Reply),
+        Reqs2 = lists:keydelete(Ref, 3, Reqs),
+        {noreply, make_next_request(State#state{reqs = Reqs2})};
+    false ->
+        {noreply, State}
+    end;
+
+handle_info({'DOWN', Ref, _, _, Reason}, #state{reqs = Reqs} = State) ->
+    case lists:keyfind(Ref, 3, Reqs) of
+    {_, notify, Ref} ->
+        Reqs2 = lists:keydelete(Ref, 3, Reqs),
+        {noreply, make_next_request(State#state{reqs = Reqs2})};
+    {_, From, Ref} ->
+        gen_server:reply(From, {'EXIT', Reason}),
+        Reqs2 = lists:keydelete(Ref, 3, Reqs),
+        {noreply, make_next_request(State#state{reqs = Reqs2})};
+    false ->
+        {noreply, State}
+    end;
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+channel_name(Shard) ->
+    try re:split(Shard, "/") of
+    [<<"shards">>, _, <<"heroku">>, AppId | _] ->
+        <<AppId/binary, ".heroku">>;
+    [<<"shards">>, _, Account | _] ->
+        Account;
+    _ ->
+        other
+    catch _:_ ->
+        other
+    end.
+
+find_channel(Account, {A, B}) ->
+    case lists:keyfind(Account, 2, A) of
+    false ->
+        case lists:keyfind(Account, 2, B) of
+        false ->
+            {new, #channel{name = Account}};
+        #channel{} = Channel ->
+            {2, Channel}
+        end;
+    #channel{} = Channel ->
+        {1, Channel}
+    end.
+
+update_channel(#channel{q = Q} = Ch, Req) ->
+    Ch#channel{q = queue:in(Req, Q)}.
+
+enqueue_channel(Account, Req, #state{channels = Q} = State) ->
+    NewState = case find_channel(Account, Q) of
+    {new, Channel0} ->
+        State#state{channels = queue:in(update_channel(Channel0, Req), Q)};
+    {Elem, Channel0} ->
+        Channel = update_channel(Channel0, Req),
+        % the channel already exists in the queue - update it in place
+        L = element(Elem, Q),
+        NewQ = setelement(Elem, Q, lists:keyreplace(Account, 2, L, Channel)),
+        State#state{channels = NewQ}
+    end,
+    maybe_submit_request(NewState).
+
+maybe_submit_request(#state{global_max=C, reqs=R} = St) when length(R) < C ->
+    make_next_request(St);
+maybe_submit_request(#state{min = Min} = State) ->
+    % look for a channel which hasn't reached the minimum yet
+    make_next_request(State, Min).
+
+make_next_request(#state{max = Max} = State) ->
+    % default behavior, look for a channel not yet maxed out
+    make_next_request(State, Max).
+
+make_next_request(#state{channels = Channels, reqs = R} = State, Threshold) ->
+    case next_unblocked_channel(Channels, R, Threshold, queue:new()) of
+    {#channel{name = Name, q = Q} = Ch, OutChannels} ->
+        {{value, Item}, NewQ} = queue:out(Q),
+        case queue:is_empty(NewQ) of
+        true ->
+            NewCh = OutChannels;
+        false ->
+            NewCh = queue:in(Ch#channel{q = NewQ}, OutChannels)
+        end,
+        submit_request(Name, Item, State#state{channels = NewCh});
+    {nil, OutQ} ->
+        % everybody is using their allotted slots, try again later
+        State#state{channels = OutQ}
+    end.
+
+next_unblocked_channel(InQ, Reqs, Max, OutQ)  ->
+    case queue:out(InQ) of
+    {empty, _} -> % all channels blocked
+        {nil, OutQ};
+    {{value, #channel{name=Name} = Channel}, NewQ} ->
+        case length([1 || {N, _, _} <- Reqs, N =:= Name]) >= Max of
+        true -> % channel is blocked, keep searching
+            next_unblocked_channel(NewQ, Reqs, Max, queue:in(Channel, OutQ));
+        false ->
+            {Channel, queue:join(NewQ, OutQ)}
+        end
+    end.
+
+
+submit_request(Channel, {{rlimit,_,Pri,T0}, From}, #state{reqs=Reqs} = State) ->
+    % rlimit fd means that we'll get a response back
+    % from the pid after it performs the call on its
+    % own
+    Ref = erlang:monitor(process, element(1, From)),
+    gen_server:reply(From, {Ref, nil}),
+    record_stats(Channel, Pri, T0),
+    State#state{reqs = [{Channel, notify, Ref} | Reqs]};
+
+submit_request(Channel, {{Fd,Call,Pri,T0}, From}, #state{reqs=Reqs} = State) ->
+    % make the request
+    Ref = erlang:monitor(process, Fd),
+    Fd ! {'$gen_call', {self(), Ref}, Call},
+    record_stats(Channel, Pri, T0),
+    State#state{reqs = [{Channel, From, Ref} | Reqs]}.
+
+record_stats(Channel, Pri, T0) ->
+    IOClass = if is_tuple(Pri) -> element(1, Pri); true -> Pri end,
+    Latency = timer:now_diff(now(),T0) / 1000,
+    catch couch_stats:increment_counter([couchdb, io_queue, IOClass]),
+    catch couch_stats:increment_counter([couchdb, io_queue, osproc]),
+    catch couch_stats:update_histogram([couchdb, io_queue, latency], Latency),
+    update_counter(Channel, IOClass, osproc).
+
+update_counter(Channel, IOClass, RW) ->
+    try ets:update_counter(osq_counters, {Channel, IOClass, RW}, 1)
+    catch error:badarg ->
+        ets:insert(osq_counters, {{Channel, IOClass, RW}, 1})
+    end.
+
+threshold(Name, Default) ->
+    try list_to_integer(config:get("osq", Name)) of
+    C when C > 0->
+        C;
+    _ ->
+        Default
+    catch _:_ ->
+        Default
+    end.
diff --git a/src/ioq_server.erl b/src/ioq_server.erl
new file mode 100644
index 0000000..8e0891d
--- /dev/null
+++ b/src/ioq_server.erl
@@ -0,0 +1,608 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_server).
+-behaviour(gen_server).
+-vsn(1).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+    code_change/3]).
+
+-export([start_link/0, call/3]).
+-export([add_channel/3, rem_channel/2, list_custom_channels/0]).
+
+-record(channel, {
+    name,
+    qI = queue:new(), % client readers
+    qU = queue:new(), % db updater
+    qV = queue:new()  % view index updates
+}).
+
+-record(state, {
+    counters,
+    histos,
+    reqs = [],
+    concurrency = 20,
+    channels = queue:new(),
+    qC = queue:new(), % compaction
+    qR = queue:new(), % internal replication
+    qL = queue:new(),
+    dedupe,
+    class_priorities,
+    op_priorities
+}).
+
+-record(request, {
+    fd,
+    msg,
+    class,
+    channel, % the name of the channel, not the actual data structure
+    from,
+    ref,
+    t0,
+    tsub
+}).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% WARNING %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% This server relies on the internal structure of the channels queue as a   %%
+%% {list(), list()} to do in-place modifications of some elements.  We are   %%
+%% "running on thin ice", as it were.                                        %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+call(Fd, Msg, Priority) ->
+    {Class, Channel} = analyze_priority(Priority),
+    Request = #request{
+        fd = Fd,
+        msg = Msg,
+        channel = Channel,
+        class = Class,
+        t0 = now()
+    },
+    case config:get("ioq.bypass", atom_to_list(Class)) of
+        "true" ->
+            RW = rw(Msg),
+            catch couch_stats:increment_counter([couchdb, io_queue_bypassed, Class]),
+            catch couch_stats:increment_counter([couchdb, io_queue_bypassed, RW]),
+            gen_server:call(Fd, Msg, infinity);
+        _ ->
+            gen_server:call(?MODULE, Request, infinity)
+    end.
+
+add_channel(Account, DbName, ChannelName) ->
+    ok = ioq_kv:put({Account, DbName}, ChannelName).
+
+rem_channel(Account, DbName) ->
+    ok = ioq_kv:delete({Account, DbName}).
+
+list_custom_channels() ->
+    ioq_kv:all().
+
+init([]) ->
+    State = #state{
+        counters = ets:new(ioq_counters, []),
+        histos = ets:new(ioq_histos, [named_table, ordered_set])
+    },
+    erlang:send_after(get_interval(), self(), dump_table),
+    {ok, update_config(State)}.
+
+handle_call({set_priority, Pri}, _From, State) ->
+    {reply, process_flag(priority, Pri), State, 0};
+
+handle_call({set_concurrency, C}, _From, State) when is_integer(C), C > 0 ->
+    {reply, State#state.concurrency, State#state{concurrency = C}, 0};
+
+handle_call(get_concurrency, _From, State) ->
+    {reply, State#state.concurrency, State, 0};
+
+handle_call(get_counters, _From, #state{counters = Tab} = State) ->
+    {reply, Tab, State, 0};
+
+handle_call(get_queue_depths, _From, State) ->
+    Channels = lists:map(fun(#channel{name=N, qI=I, qU=U, qV=V}) ->
+        {N, [queue:len(I), queue:len(U), queue:len(V)]}
+    end, queue:to_list(State#state.channels)),
+    Response = [
+        {compaction, queue:len(State#state.qC)},
+        {replication, queue:len(State#state.qR)},
+        {low, queue:len(State#state.qL)},
+        {channels, {Channels}}
+    ],
+    {reply, Response, State, 0};
+
+handle_call(reset_histos, _From, State) ->
+    ets:delete_all_objects(State#state.histos),
+    {reply, ok, State, 0};
+
+handle_call(#request{} = Req, From, State) ->
+    {noreply, enqueue_request(Req#request{from = From}, State), 0};
+
+%% backwards-compatible mode for messages sent during hot upgrade
+handle_call({Fd, Msg, Priority, T0}, From, State) ->
+    {Class, Chan} = analyze_priority(Priority),
+    R = #request{fd=Fd, msg=Msg, channel=Chan, class=Class, t0=T0, from=From},
+    {noreply, enqueue_request(R, State), 0};
+
+handle_call(_Msg, _From, State) ->
+    {reply, ignored, State, 0}.
+
+handle_cast(update_config, State) ->
+    {noreply, update_config(State), 0};
+
+handle_cast(_Msg, State) ->
+    {noreply, State, 0}.
+
+handle_info({Ref, Reply}, #state{reqs = Reqs} = State) ->
+    case lists:keytake(Ref, #request.ref, Reqs) of
+    {value, #request{from=From} = Req, Reqs2} ->
+        TResponse = erlang:now(),
+        erlang:demonitor(Ref, [flush]),
+        reply_to_all(From, Reply),
+        update_histograms(ioq_histos, Req, TResponse),
+        {noreply, State#state{reqs = Reqs2}, 0};
+    false ->
+        {noreply, State, 0}
+    end;
+
+handle_info({'DOWN', Ref, _, _, Reason}, #state{reqs = Reqs} = State) ->
+    case lists:keytake(Ref, #request.ref, Reqs) of
+    {value, #request{from=From}, Reqs2} ->
+        reply_to_all(From, {'EXIT', Reason}),
+        {noreply, State#state{reqs = Reqs2}, 0};
+    false ->
+        {noreply, State, 0}
+    end;
+
+handle_info(dump_table, State) ->
+    erlang:send_after(get_interval(), self(), dump_table),
+    {noreply, dump_table(State), 0};
+
+handle_info(timeout, State) ->
+    {noreply, maybe_submit_request(State)};
+
+handle_info(_Info, State) ->
+    {noreply, State, 0}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, #state{}=State, _Extra) ->
+    {ok, State}.
+
+update_config(State) ->
+    Concurrency = try
+        list_to_integer(config:get("ioq", "concurrency", "20"))
+    catch _:_ ->
+        20
+    end,
+
+    DeDupe = config:get("ioq", "dedupe", "true") == "true",
+
+    P1 = to_float(catch config:get("ioq", "customer", "1.0")),
+    P2 = to_float(catch config:get("ioq", "replication", "0.001")),
+    P3 = to_float(catch config:get("ioq", "compaction", "0.0001")),
+    P4 = to_float(catch config:get("ioq", "low", "0.0001")),
+
+    P5 = to_float(catch config:get("ioq", "reads", "1.0")),
+    P6 = to_float(catch config:get("ioq", "writes", "1.0")),
+    P7 = to_float(catch config:get("ioq", "views", "1.0")),
+
+    State#state{
+        concurrency = Concurrency,
+        dedupe = DeDupe,
+        class_priorities = [P1, P2, P3, P4],
+        op_priorities = [P5, P6, P7]
+    }.
+
+reply_to_all([], _Reply) ->
+    ok;
+reply_to_all([From|Rest], Reply) ->
+    gen_server:reply(From, Reply),
+    reply_to_all(Rest, Reply);
+reply_to_all(From, Reply) ->
+    gen_server:reply(From, Reply).
+
+analyze_priority({interactive, Shard}) ->
+    {interactive, channel_name(Shard)};
+analyze_priority({db_update, Shard}) ->
+    {db_update, channel_name(Shard)};
+analyze_priority({view_update, Shard, _GroupId}) ->
+    {view_update, channel_name(Shard)};
+analyze_priority({db_compact, _Shard}) ->
+    {db_compact, nil};
+analyze_priority({view_compact, _Shard, _GroupId}) ->
+    {view_compact, nil};
+analyze_priority({internal_repl, _Shard}) ->
+    {internal_repl, nil};
+analyze_priority({low, _Shard}) ->
+    {low, nil};
+analyze_priority(_Else) ->
+    {other, other}.
+
+channel_name(Shard) ->
+    try split(Shard) of
+    [<<"shards">>, _, <<"heroku">>, AppId | _] ->
+        <<AppId/binary, ".heroku">>;
+    [<<"shards">>, _, DbName] ->
+        ioq_kv:get({other, DbName}, other);
+    [<<"shards">>, _, Account, DbName] ->
+        ioq_kv:get({Account, DbName}, Account);
+    [<<"shards">>, _, Account | DbParts] ->
+        ioq_kv:get({Account, filename:join(DbParts)}, Account);
+    _ ->
+        other
+    catch _:_ ->
+        other
+    end.
+
+enqueue_request(#request{class = db_compact} = Req, State) ->
+    State#state{qC = update_queue(Req, State#state.qC, State#state.dedupe)};
+enqueue_request(#request{class = view_compact} = Req, State) ->
+    State#state{qC = update_queue(Req, State#state.qC, State#state.dedupe)};
+enqueue_request(#request{class = internal_repl} = Req, State) ->
+    State#state{qR = update_queue(Req, State#state.qR, State#state.dedupe)};
+enqueue_request(#request{class = low} = Req, State) ->
+    State#state{qL = update_queue(Req, State#state.qL, State#state.dedupe)};
+enqueue_request(Req, State) ->
+    enqueue_channel(Req, State).
+
+find_channel(Account, {A, B}) ->
+    case lists:keyfind(Account, #channel.name, A) of
+    false ->
+        case lists:keyfind(Account, #channel.name, B) of
+        false ->
+            {new, #channel{name = Account}};
+        #channel{} = Channel ->
+            {2, Channel}
+        end;
+    #channel{} = Channel ->
+        {1, Channel}
+    end.
+
+update_channel(Ch, #request{class = view_update} = Req, Dedupe) ->
+    Ch#channel{qV = update_queue(Req, Ch#channel.qV, Dedupe)};
+update_channel(Ch, #request{class = db_update} = Req, Dedupe) ->
+    Ch#channel{qU = update_queue(Req, Ch#channel.qU, Dedupe)};
+update_channel(Ch, Req, Dedupe) ->
+    % everything else is interactive IO class
+    Ch#channel{qI = update_queue(Req, Ch#channel.qI, Dedupe)}.
+
+update_queue(#request{from=From, fd=Fd, msg={pread_iolist, Pos}}=Req, Q, DD) ->
+    case maybe_dedupe(Fd, Pos, Q, DD) of
+    false ->
+        queue:in(Req, Q);
+    {Elem, N, #request{from=From1}=Match} ->
+        catch couch_stats:increment_counter([couchdb, io_queue, merged]),
+        Match1 = Match#request{from=append(From, From1)},
+        L = element(Elem, Q),
+        {H, [Match|T]} = lists:split(N, L),
+        setelement(Elem, Q, H ++ [Match1|T])
+    end;
+update_queue(Req, Q, _Dedupe) ->
+    queue:in(Req, Q).
+
+append(A, B) when is_list(B) ->
+    [A|B];
+append(A, B) ->
+    [A, B].
+
+maybe_dedupe(Fd, Pos, Q, Dedupe) ->
+    case Dedupe of
+        true -> matching_request(Fd, Pos, Q);
+        false -> false
+    end.
+
+matching_request(Fd, Pos, {A, B}) ->
+    case matching_request(Fd, Pos, A) of
+    false ->
+        case matching_request(Fd, Pos, B) of
+        false ->
+            false;
+        {N, Request} ->
+            {2, N, Request}
+        end;
+    {N, Request} ->
+        {1, N, Request}
+    end;
+matching_request(Fd, Pos, List) ->
+    matching_request(Fd, Pos, 0, List).
+
+matching_request(_Fd, _Pos, _N, []) ->
+    false;
+matching_request(Fd, Pos, N, [#request{fd=Fd, msg={pread_iolist, Pos}}=Req|_]) ->
+    {N, Req};
+matching_request(Fd, Pos, N, [_|Rest]) ->
+    matching_request(Fd, Pos, N + 1, Rest).
+
+enqueue_channel(#request{channel=Account} = Req, #state{channels=Q} = State) ->
+    DD = State#state.dedupe,
+    case find_channel(Account, Q) of
+    {new, Channel0} ->
+        State#state{channels = queue:in(update_channel(Channel0, Req, DD), Q)};
+    {Elem, Channel0} ->
+        Channel = update_channel(Channel0, Req, DD),
+        % the channel already exists in the queue - update it in place
+        L = element(Elem, Q),
+        NewL = lists:keyreplace(Account, #channel.name, L, Channel),
+        NewQ = setelement(Elem, Q, NewL),
+        State#state{channels = NewQ}
+    end.
+
+maybe_submit_request(#state{concurrency=C,reqs=R} = St) when length(R) < C ->
+    case make_next_request(St) of
+        St ->
+            St;
+        NewState when length(R) >= C-1 ->
+            NewState;
+        NewState ->
+            maybe_submit_request(NewState)
+    end;
+maybe_submit_request(State) ->
+    State.
+
+sort_queues(QueuesAndPriorities, Normalization, Choice) ->
+    sort_queues(QueuesAndPriorities, Normalization, Choice, 0, [], []).
+
+sort_queues([{Q, _Pri}], _Norm, _Choice, _X, [], Acc) ->
+    lists:reverse([Q | Acc]);
+sort_queues([{Q, Pri}], Norm, Choice, _X, Skipped, Acc) ->
+    sort_queues(lists:reverse(Skipped), Norm - Pri, Choice, 0, [], [Q | Acc]);
+sort_queues([{Q, Pri} | Rest], Norm, Choice, X, Skipped, Acc) ->
+    if Choice < ((X + Pri) / Norm) ->
+        Remaining = lists:reverse(Skipped, Rest),
+        sort_queues(Remaining, Norm - Pri, Choice, 0, [], [Q | Acc]);
+    true ->
+        sort_queues(Rest, Norm, Choice, X + Pri, [{Q, Pri} | Skipped], Acc)
+    end.
+
+make_next_request(State) ->
+    #state{
+        channels = Ch,
+        qC = QC,
+        qR = QR,
+        qL = QL,
+        class_priorities = ClassP,
+        op_priorities = OpP
+    } = State,
+
+
+    {Item, [NewCh, NewQR, NewQC, NewQL]} =
+        choose_next_request([Ch, QR, QC, QL], ClassP),
+
+    case Item of nil ->
+        State;
+    #channel{qI = QI, qU = QU, qV = QV} = Channel ->
+        % An IO channel has at least one interactive or view indexing request.
+        % If the channel has more than one request, we'll toss it back into the
+        % queue after we've extracted one here
+        {Item2, [QI2, QU2, QV2]} =
+            choose_next_request([QI, QU, QV], OpP),
+        case queue:is_empty(QU2) andalso
+             queue:is_empty(QI2) andalso
+             queue:is_empty(QV2) of
+        true ->
+            NewCh2 = NewCh;
+        false ->
+            NewCh2 = queue:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh)
+        end,
+        submit_request(Item2, State#state{channels=NewCh2, qC=NewQC,
+            qR=NewQR, qL=NewQL});
+    _ ->
+        % Item is a background (compaction or internal replication) task
+        submit_request(Item, State#state{channels=NewCh, qC=NewQC, qR=NewQR,
+            qL=NewQL})
+    end.
+
+submit_request(Request, State) ->
+    #request{
+        channel = Channel,
+        fd = Fd,
+        msg = Call,
+        t0 = T0,
+        class = IOClass
+    } = Request,
+    #state{reqs = Reqs, counters = Counters} = State,
+
+    % make the request
+    Ref = erlang:monitor(process, Fd),
+    Fd ! {'$gen_call', {self(), Ref}, Call},
+
+    % record some stats
+    RW = rw(Call),
+    SubmitTime = now(),
+    Latency = timer:now_diff(SubmitTime, T0) / 1000,
+    catch couch_stats:increment_counter([couchdb, io_queue, IOClass]),
+    catch couch_stats:increment_counter([couchdb, io_queue, RW]),
+    catch couch_stats:update_histogram([couchdb, io_queue, latency], Latency),
+    update_counter(Counters, Channel, IOClass, RW),
+    State#state{reqs = [Request#request{tsub=SubmitTime, ref=Ref} | Reqs]}.
+
+update_counter(Tab, Channel, IOClass, RW) ->
+    upsert(Tab, {Channel, IOClass, RW}, 1).
+
+update_histograms(Tab, Req, TResponse) ->
+    #request{t0=T0, tsub=TSubmit, class=Class, channel=Channel, msg=Msg} = Req,
+    Delta1 = timer:now_diff(TSubmit, T0),
+    Delta2 = timer:now_diff(TResponse, TSubmit),
+    Bin1 = timebin(Delta1),
+    Bin2 = timebin(Delta2),
+    Bin3 = timebin(Delta1+Delta2),
+    if Channel =/= nil ->
+        upsert(Tab, {Channel, submit_delay, Bin1}, 1),
+        upsert(Tab, {Channel, svctm, Bin2}, 1),
+        upsert(Tab, {Channel, iowait, Bin3}, 1);
+    true -> ok end,
+    Key = make_key(Class, Msg),
+    upsert(Tab, {Key, submit_delay, Bin1}, 1),
+    upsert(Tab, {Key, svctm, Bin2}, 1),
+    upsert(Tab, {Key, iowait, Bin3}, 1).
+
+make_key(db_compact, _) ->
+    <<"compaction">>;
+make_key(view_compact, _) ->
+    <<"compaction">>;
+make_key(internal_repl, _) ->
+    <<"replication">>;
+make_key(low, _) ->
+    <<"low">>;
+make_key(view_update, _) ->
+    <<"views">>;
+make_key(db_update, _) ->
+    <<"writes">>;
+make_key(interactive, {pread_iolist, _}) ->
+    <<"reads">>;
+make_key(interactive, {append_bin, _}) ->
+    <<"writes">>;
+make_key(_, _) ->
+    <<"other">>.
+
+upsert(Tab, Key, Incr) ->
+    try ets:update_counter(Tab, Key, Incr)
+    catch error:badarg ->
+        ets:insert(Tab, {Key, Incr})
+    end.
+
+timebin(V) ->
+    trunc(10*math:log10(V)).
+
+choose_next_request(Qs, Priorities) ->
+    Norm = lists:sum(Priorities),
+    QueuesAndPriorities = lists:zip(Qs, Priorities),
+    SortedQueues = sort_queues(QueuesAndPriorities, Norm, random:uniform()),
+    {Item, NewQueues} = choose_prioritized_request(SortedQueues),
+    Map0 = lists:zip(SortedQueues, NewQueues),
+    {Item, [element(2, lists:keyfind(Q, 1, Map0)) || Q <- Qs]}.
+
+choose_prioritized_request(Qs) ->
+    choose_prioritized_request(Qs, []).
+
+choose_prioritized_request([], Empties) ->
+    {nil, lists:reverse(Empties)};
+choose_prioritized_request([Q | Rest], Empties) ->
+    case queue:out(Q) of
+    {empty, _} ->
+        choose_prioritized_request(Rest, [Q | Empties]);
+    {{value, Item}, NewQ} ->
+        {Item, lists:reverse([NewQ | Empties], Rest)}
+    end.
+
+to_float("0") ->
+    0.00001;
+to_float("1") ->
+    1.0;
+to_float(String) when is_list(String) ->
+    try list_to_float(String) catch error:badarg -> 0.5 end;
+to_float(_) ->
+    0.5.
+
+dump_table(#state{counters = Tab} = State) ->
+    Pid = spawn(fun save_to_db/0),
+    ets:give_away(Tab, Pid, nil),
+    State#state{counters = ets:new(ioq_counters, [])}.
+
+save_to_db() ->
+    Timeout = get_interval(),
+    receive {'ETS-TRANSFER', Tab, _, _} ->
+        Dict = ets:foldl(fun table_fold/2, dict:new(), Tab),
+        TS = list_to_binary(iso8601_timestamp()),
+        Doc = {[
+            {<<"_id">>, TS},
+            {type, ioq},
+            {node, node()},
+            {accounts, {dict:to_list(Dict)}}
+        ]},
+        try
+            fabric:update_doc(get_stats_dbname(), Doc, [])
+        catch error:database_does_not_exist ->
+            couch_log:error("Missing IOQ stats db: ~s", [get_stats_dbname()])
+        end
+    after Timeout ->
+        error_logger:error_report({?MODULE, "ets transfer failed"})
+    end.
+
+table_fold({{other, _, _}, _}, D) ->
+    D;
+table_fold({{Channel, interactive, reads}, X}, D) ->
+    dict:update(Channel, fun([A,B,C]) -> [A+X,B,C] end, [X,0,0], D);
+table_fold({{Channel, interactive, writes}, X}, D) ->
+    dict:update(Channel, fun([A,B,C]) -> [A,B+X,C] end, [0,X,0], D);
+table_fold({{Channel, db_update, reads}, X}, D) ->
+    dict:update(Channel, fun([A,B,C]) -> [A,B+X,C] end, [0,X,0], D);
+table_fold({{Channel, db_update, writes}, X}, D) ->
+    dict:update(Channel, fun([A,B,C]) -> [A,B+X,C] end, [0,X,0], D);
+table_fold({{Channel, view_update, reads}, X}, D) ->
+    dict:update(Channel, fun([A,B,C]) -> [A,B,C+X] end, [0,0,X], D);
+table_fold({{Channel, view_update, writes}, X}, D) ->
+    dict:update(Channel, fun([A,B,C]) -> [A,B,C+X] end, [0,0,X], D);
+table_fold(_, D) ->
+    D.
+
+iso8601_timestamp() ->
+    {_,_,Micro} = Now = os:timestamp(),
+    {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
+    Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
+    io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
+
+get_interval() ->
+    case application:get_env(ioq, stats_interval) of
+    {ok, Interval} when is_integer(Interval) ->
+        Interval;
+    _ ->
+        60000
+    end.
+
+get_stats_dbname() ->
+    case application:get_env(ioq, stats_db) of
+    {ok, DbName} when is_list(DbName) ->
+        DbName;
+    _ ->
+        "stats"
+    end.
+
+split(B) when is_binary(B) ->
+    split(B, 0, 0, []);
+split(B) -> B.
+
+split(B, O, S, Acc) ->
+    case B of
+    <<_:O/binary>> ->
+        Len = O - S,
+        <<_:S/binary, Part:Len/binary>> = B,
+        lists:reverse(Acc, [Part]);
+    <<_:O/binary, $/, _/binary>> ->
+        Len = O - S,
+        <<_:S/binary, Part:Len/binary, _/binary>> = B,
+        split(B, O+1, O+1, [Part | Acc]);
+    _ ->
+        split(B, O+1, S, Acc)
+    end.
+
+rw({pread_iolist, _}) ->
+    reads;
+rw({append_bin, _}) ->
+    writes;
+rw(_) ->
+    unknown.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+sort_queues_test() ->
+    ?assertEqual([a, b, c], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.10)),
+    ?assertEqual([a, c, b], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.45)),
+    ?assertEqual([b, a, c], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.60)),
+    ?assertEqual([b, c, a], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.65)),
+    ?assertEqual([c, a, b], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.71)),
+    ?assertEqual([c, b, a], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.90)).
+
+-endif.
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
new file mode 100644
index 0000000..5e2e01f
--- /dev/null
+++ b/src/ioq_server2.erl
@@ -0,0 +1,1068 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_server2).
+-behavior(gen_server).
+
+
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2,
+    code_change/3
+]).
+-export([
+    start_link/3,
+    call/3,
+    pcall/1,
+    pcall/2
+]).
+-export([
+    get_state/0,
+    get_state/1,
+    update_config/0,
+    get_queue_depths/0,
+    get_queue_depths/1,
+    get_concurrency/0,
+    set_concurrency/1,
+    get_counters/0
+]).
+
+
+-include_lib("ioq/include/ioq.hrl").
+
+
+-define(DEFAULT_RESIZE_LIMIT, 1000).
+-define(DEFAULT_CONCURRENCY, 1).
+-define(DEFAULT_SCALE_FACTOR, 2.0).
+-define(DEFAULT_MAX_PRIORITY, 10000.0).
+
+
+-record(state, {
+    reqs :: khash:khash(),
+    waiters :: khash:khash(),
+    queue :: hqueue:hqueue(),
+    concurrency = ?DEFAULT_CONCURRENCY :: pos_integer(),
+    iterations = 0 :: non_neg_integer(),
+    class_p :: khash:khash(),  %% class priorities
+    user_p :: khash:khash(),   %% user priorities
+    shard_p :: khash:khash(),  %% shard priorities
+    scale_factor = ?DEFAULT_SCALE_FACTOR :: float(),
+    dedupe = true :: boolean(),
+    resize_limit = ?DEFAULT_RESIZE_LIMIT :: pos_integer(),
+    next_key = 1 :: pos_integer(),
+    server_name :: atom(),
+    scheduler_id = 0 :: non_neg_integer(),
+    max_priority = ?DEFAULT_MAX_PRIORITY :: float()
+}).
+
+
+-type state() :: #state{}.
+-type waiter_key() :: {pid(), integer()} | pos_integer().
+-type priority() :: float(). %% should be non_negative_float().
+
+%% Hacky queue_depth type due to existing fixed element lists for JSON in API
+%% Actual type is:
+%% [
+%%   {compaction, non_neg_integer()},
+%%   {replication, non_neg_integer()},
+%%   {low, non_neg_integer()},
+%%   {channels, {[{username(), [Interactive, DbUpdate, ViewUpdate]}]}}
+%% ]
+%% when Interactive = DbUpdate = ViewUpdate = non_neg_integer().
+-type depths() :: compaction | replication | low.
+-type user_depth() :: {binary(), [non_neg_integer()]}.
+-type depth_ele() :: {depths(), non_neg_integer()} | user_depth().
+-type queue_depths() :: [depth_ele()].
+-type read_write() :: reads | writes | unknown.
+
+
+-define(SERVER_ID(SID), list_to_atom("ioq_server_" ++ integer_to_list(SID))).
+
+
+-spec call(pid(), term(), io_dimensions()) -> term().
+call(Fd, Msg, Dimensions) ->
+    Req0 = #ioq_request{
+        fd = Fd,
+        msg = Msg,
+        t0 = os:timestamp()
+    },
+    Req = add_request_dimensions(Req0, Dimensions),
+    Class = atom_to_list(Req#ioq_request.class),
+    case config:get_boolean("ioq2.bypass", Class, false) of
+        true ->
+            RW = rw(Msg),
+            couch_stats:increment_counter([couchdb, io_queue2, bypassed_count]),
+            couch_stats:increment_counter(
+                [couchdb, io_queue2, RW, bypassed_count]),
+            gen_server:call(Fd, Msg, infinity);
+        _ ->
+            DispatchStrategy = config:get(
+                "ioq2", "dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER),
+            Server = case DispatchStrategy of
+                ?DISPATCH_RANDOM ->
+                    maybe_seed(),
+                    SID = random:uniform(erlang:system_info(schedulers)),
+                    ?SERVER_ID(SID);
+                ?DISPATCH_FD_HASH ->
+                    NumSchedulers = erlang:system_info(schedulers),
+                    SID = 1 + (erlang:phash2(Fd) rem NumSchedulers),
+                    ?SERVER_ID(SID);
+                ?DISPATCH_SINGLE_SERVER ->
+                    ?SERVER_ID(1);
+                _ ->
+                    SID = erlang:system_info(scheduler_id),
+                    ?SERVER_ID(SID)
+            end,
+            gen_server:call(Server, Req, infinity)
+    end.
+
+
+-spec pcall(any()) -> any().
+pcall(Msg) ->
+    pcall(Msg, 500).
+
+
+-spec pcall(any(), non_neg_integer()) -> any().
+pcall(Msg, Timeout) ->
+    {MainPid, MainRef} = spawn_monitor(fun() ->
+        PidRefs = lists:map(fun(Name) ->
+            spawn_monitor(fun() ->
+                Resp = gen_server:call(Name, Msg, Timeout),
+                exit({resp_ok, Resp})
+            end)
+        end, ioq_sup:get_ioq2_servers()),
+        Resps = lists:map(fun({Pid, _}) ->
+            receive
+                {'DOWN', _, _, Pid, {resp_ok, Resp}} ->
+                    Resp;
+                {'DOWN', _, _, Pid, Error} ->
+                    exit(Error)
+            end
+        end, PidRefs),
+        exit({resp_ok, Resps})
+    end),
+    receive
+        {'DOWN', _, _, MainPid, {resp_ok, Resps}} ->
+            {ok, Resps};
+        {'DOWN', _, _, MainPid, Error} ->
+            {error, Error}
+    after Timeout ->
+        erlang:demonitor(MainRef, [flush]),
+        exit(MainPid, kill),
+        {error, timeout}
+    end.
+
+
+-spec get_queue_depths() -> queue_depths().
+get_queue_depths() ->
+    case pcall(get_pending_reqs, 500) of
+        {ok, PReqs} ->
+            get_queue_depths([Req || {_Priority, Req} <- lists:flatten(PReqs)]);
+        {error, _} ->
+            [
+                {compaction, ?BAD_MAGIC_NUM},
+                {replication, ?BAD_MAGIC_NUM},
+                {low, ?BAD_MAGIC_NUM},
+                {channels, {[]}}
+            ]
+    end.
+
+
+-spec get_queue_depths([ioq_request()]) -> queue_depths().
+get_queue_depths(Reqs) ->
+    {ok, Users0} = khash:new(),
+    {Compaction, Replication, Low, Users} = lists:foldl(
+        fun
+            (#ioq_request{class=db_compact}, {C, R, L, U}) ->
+                {C+1, R, L, U};
+            (#ioq_request{class=view_compact}, {C, R, L, U}) ->
+                {C+1, R, L, U};
+            (#ioq_request{class=internal_repl}, {C, R, L, U}) ->
+                {C, R+1, L, U};
+            (#ioq_request{class=low}, {C, R, L, U}) ->
+                {C, R, L+1, U};
+            (#ioq_request{class=Class, user=User}, {C, R, L, U}) ->
+                [UI0, UDB0, UV0] = case khash:get(U, User) of
+                    undefined ->
+                        [0,0,0];
+                    UC0 ->
+                        UC0
+                end,
+                UC = case Class of
+                    db_update ->
+                        [UI0, UDB0+1, UV0];
+                    view_update ->
+                        [UI0, UDB0, UV0+1];
+                    _Interactive ->
+                        [UI0+1, UDB0, UV0]
+                end,
+                ok = khash:put(U, User, UC),
+                {C, R, L, U}
+        end,
+        {0, 0, 0, Users0},
+        Reqs
+    ),
+    [
+        {compaction, Compaction},
+        {replication, Replication},
+        {low, Low},
+        {channels, {khash:to_list(Users)}}
+    ].
+
+
+-spec get_concurrency() -> non_neg_integer().
+get_concurrency() ->
+    case pcall(get_concurrency, 500) of
+        {ok, Concurrencies} ->
+            lists:sum(Concurrencies);
+        {error, _} ->
+            ?BAD_MAGIC_NUM
+    end.
+
+
+-spec set_concurrency(non_neg_integer()) -> non_neg_integer().
+set_concurrency(C) when is_integer(C), C > 0 ->
+    lists:foldl(
+        fun(Pid, Total) ->
+            {ok, Old} = gen_server:call(Pid, {set_concurrency, C}, 1000),
+            Total + Old
+        end,
+        0,
+        ioq_sup:get_ioq2_servers()
+    );
+set_concurrency(_) ->
+    erlang:error(badarg).
+
+
+get_counters() ->
+    undefined.
+
+
+%% @equiv get_state(?SERVER_ID(1))
+-spec get_state() -> any().
+get_state() ->
+    get_state(?SERVER_ID(1)).
+
+
+%% Returns a mutated #state{} with list representations of khash/hqueue objects
+-spec get_state(atom()) -> any().
+get_state(Server) ->
+    gen_server:call(Server, get_state, infinity).
+
+
+-spec update_config() -> ok.
+update_config() ->
+    gen_server:call(?SERVER_ID(1), update_config, infinity).
+
+
+start_link(Name, SID, Bind) ->
+    Options = case Bind of
+        true -> [{scheduler, SID}];
+        false -> []
+    end,
+    gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options).
+
+
+init([Name, SID]) ->
+    {ok, HQ} = hqueue:new(),
+    {ok, Reqs} = khash:new(),
+    {ok, Waiters} = khash:new(),
+    State = #state{
+        queue = HQ,
+        reqs = Reqs,
+        waiters = Waiters,
+        server_name = Name,
+        scheduler_id = SID
+    },
+    {ok, update_config_int(State)}.
+
+
+handle_call(get_state, _From, State) ->
+    Resp = State#state{
+        user_p = khash:to_list(State#state.user_p),
+        class_p = khash:to_list(State#state.class_p),
+        shard_p = khash:to_list(State#state.shard_p),
+        reqs = khash:to_list(State#state.reqs),
+        waiters = khash:to_list(State#state.waiters),
+        queue = hqueue:to_list(State#state.queue)
+    },
+
+    {reply, Resp, State, 0};
+handle_call(#ioq_request{} = Req, From, State) ->
+    {noreply, enqueue_request(Req#ioq_request{from=From}, State), 0};
+handle_call({hqueue, Method, Args}, _From, #state{queue=HQ}=State) ->
+    Resp = erlang:apply(hqueue, Method, [HQ | Args]),
+    {reply, Resp, State, 0};
+handle_call(update_config, _From, State) ->
+    {reply, ok, update_config_int(State), 0};
+handle_call(get_concurrency, _From, State) ->
+    {reply, State#state.concurrency, State, 0};
+handle_call({set_concurrency, C}, _From, State) when is_integer(C), C > 0 ->
+    {reply, {ok, State#state.concurrency}, State#state{concurrency = C}, 0};
+handle_call(get_reqs, _From, #state{reqs=Reqs}=State) ->
+    {reply, khash:to_list(Reqs), State, 0};
+handle_call(get_pending_reqs, _From, #state{queue=HQ}=State) ->
+    {reply, hqueue:to_list(HQ), State, 0};
+handle_call(get_counters, _From, State) ->
+    {reply, undefined, State, 0};
+handle_call(_, _From, State) ->
+    {reply, ok, State, 0}.
+
+
+handle_cast(update_config, State) ->
+    {noreply, update_config_int(State), 0};
+handle_cast(_Msg, State) ->
+    {noreply, State, 0}.
+
+
+handle_info({Ref, Reply}, #state{reqs = Reqs} = State) ->
+    case khash:get(Reqs, Ref) of
+        undefined ->
+            ok;
+        #ioq_request{ref=Ref}=Req ->
+            ok = khash:del(Reqs, Ref),
+            TResponse = os:timestamp(),
+            ServiceTime = time_delta(TResponse, Req#ioq_request.tsub),
+            IOWait = time_delta(TResponse, Req#ioq_request.t0),
+            couch_stats:update_histogram(
+                [couchdb, io_queue2, svctm], ServiceTime),
+            couch_stats:update_histogram([couchdb, io_queue2, iowait], IOWait),
+            erlang:demonitor(Ref, [flush]),
+            send_response(State#state.waiters, Req, Reply)
+    end,
+    {noreply, State, 0};
+handle_info({'DOWN', Ref, _, _, Reason}, #state{reqs = Reqs} = State) ->
+    case khash:get(Reqs, Ref) of
+        undefined ->
+            ok;
+        #ioq_request{ref=Ref}=Req ->
+            couch_stats:increment_counter([couchdb, io_queue2, io_errors]),
+            ok = khash:del(Reqs, Ref),
+            send_response(State#state.waiters, Req, {'EXIT', Reason})
+    end,
+    {noreply, State, 0};
+handle_info(timeout, State) ->
+    {noreply, maybe_submit_request(State)};
+handle_info(_Info, State) ->
+    {noreply, State, 0}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+code_change(_OldVsn, #state{}=State, _Extra) ->
+    {ok, State}.
+
+
+-spec update_config_int(state()) -> state().
+update_config_int(State) ->
+    Concurrency = config:get_integer("ioq2", "concurrency", ?DEFAULT_CONCURRENCY),
+    ResizeLimit = config:get_integer("ioq2", "resize_limit", ?DEFAULT_RESIZE_LIMIT),
+    DeDupe = config:get_boolean("ioq2", "dedupe", true),
+
+    ScaleFactor = ioq_config:to_float(
+        config:get("ioq2", "scale_factor"),
+        ?DEFAULT_SCALE_FACTOR
+    ),
+
+    MaxPriority = ioq_config:to_float(
+        config:get("ioq2", "max_priority"),
+        ?DEFAULT_MAX_PRIORITY
+    ),
+
+    {ok, ClassP} = ioq_config:build_class_priorities(),
+    {ok, UserP} = ioq_config:build_user_priorities(),
+    {ok, ShardP} = ioq_config:build_shard_priorities(),
+
+    State#state{
+        user_p = UserP,
+        class_p = ClassP,
+        shard_p = ShardP,
+        scale_factor = ScaleFactor,
+        concurrency = Concurrency,
+        dedupe = DeDupe,
+        resize_limit = ResizeLimit,
+        max_priority = MaxPriority
+    }.
+
+
+-spec maybe_submit_request(state()) -> state().
+maybe_submit_request(#state{reqs=Reqs, concurrency=C}=State) ->
+    NumReqs = khash:size(Reqs),
+    case NumReqs < C of
+        true ->
+            case make_next_request(State) of
+                State ->
+                    State;
+                NewState when NumReqs >= C-1 ->
+                    NewState;
+                NewState ->
+                    maybe_submit_request(NewState)
+            end;
+        false ->
+            State
+    end.
+
+
+-spec make_next_request(state()) -> state().
+make_next_request(#state{queue=HQ}=State) ->
+    case hqueue:extract_max(HQ) of
+        {error, empty} ->
+            State;
+        {Priority, #ioq_request{} = Req} ->
+            submit_request(Req#ioq_request{fin_priority=Priority}, State)
+    end.
+
+
+-spec submit_request(ioq_request(), state()) -> state().
+submit_request(Req, #state{iterations=I, resize_limit=RL}=State) when I >= RL ->
+    ok = hqueue:scale_by(State#state.queue, State#state.scale_factor),
+    submit_request(Req, State#state{iterations=0});
+submit_request(Req, #state{iterations=Iterations}=State) ->
+    #ioq_request{
+        fd = Fd,
+        msg = Call,
+        class = Class,
+        t0 = T0
+    } = Req,
+    #state{reqs = Reqs} = State,
+
+    % make the request
+    Ref = erlang:monitor(process, Fd),
+    Fd ! {'$gen_call', {self(), Ref}, Call},
+
+    % record some stats
+    RW = rw(Call),
+
+    SubmitTime = os:timestamp(),
+    Latency = time_delta(SubmitTime, T0),
+    couch_stats:increment_counter([couchdb, io_queue2, Class, count]),
+    couch_stats:increment_counter([couchdb, io_queue2, RW, count]),
+    couch_stats:update_histogram([couchdb, io_queue2, submit_delay], Latency),
+    khash:put(Reqs, Ref, Req#ioq_request{tsub=SubmitTime, ref=Ref}),
+    State#state{iterations=Iterations+1}.
+
+
+-spec send_response(khash:khash(), ioq_request(), term()) -> [ok].
+send_response(Waiters, #ioq_request{key=Key}, Reply) ->
+    Waiting = khash:get(Waiters, Key),
+    khash:del(Waiters, Key),
+    [gen_server:reply(W, Reply) || W <- Waiting].
+
+
+-spec waiter_key(ioq_request(), state()) -> {waiter_key(), state()}.
+waiter_key(Req, State) ->
+    case {State#state.dedupe, Req#ioq_request.msg} of
+        {true, {pread_iolist, Pos}} ->
+            {{Req#ioq_request.fd, Pos}, State};
+        _ ->
+            Next = State#state.next_key,
+            {Next, State#state{next_key = Next + 1}}
+    end.
+
+
+-spec enqueue_request(ioq_request(), state()) -> state().
+enqueue_request(Req, #state{queue=HQ, waiters=Waiters}=State0) ->
+    #ioq_request{
+        from = From,
+        msg = Msg
+    } = Req,
+    {ReqKey, State} = waiter_key(Req, State0),
+    RW = rw(Msg),
+
+    couch_stats:increment_counter([couchdb, io_queue2, queued]),
+    couch_stats:increment_counter([couchdb, io_queue2, RW, queued]),
+
+    case khash:get(State#state.waiters, ReqKey, not_found) of
+        not_found ->
+            Priority = prioritize_request(Req, State),
+            Req1 = Req#ioq_request{
+                key = ReqKey,
+                init_priority = Priority
+            },
+            hqueue:insert(HQ, Priority, Req1),
+            khash:put(State#state.waiters, ReqKey, [From]);
+        Pids ->
+            couch_stats:increment_counter([couchdb, io_queue2, merged]),
+            khash:put(Waiters, ReqKey, [From | Pids])
+    end,
+    State.
+
+
+-spec add_request_dimensions(ioq_request(), io_dimensions()) -> ioq_request().
+add_request_dimensions(Request, {Class, Shard}) ->
+    add_request_dimensions(Request, {Class, Shard, undefined});
+add_request_dimensions(Request, {Class, Shard0, GroupId}) ->
+    {Shard, User, DbName} = case {Class, Shard0} of
+        {interactive, "dbcopy"} ->
+            {undefined, undefined, undefined};
+        {db_meta, security} ->
+            {undefined, undefined, undefined};
+        _ ->
+            Shard1 = filename:rootname(Shard0),
+            {User0, DbName0} = shard_info(Shard1),
+            {Shard1, User0, DbName0}
+    end,
+    Request#ioq_request{
+        shard = Shard,
+        user = User,
+        db = DbName,
+        ddoc = GroupId,
+        class = Class
+    };
+add_request_dimensions(Request, undefined) ->
+    Request#ioq_request{class=other}.
+
+
+-spec shard_info(dbname()) -> {any(), any()}.
+shard_info(Shard) ->
+    try split(Shard) of
+        [<<"shards">>, _, <<"heroku">>, AppId, DbName] ->
+            {<<AppId/binary, ".heroku">>, DbName};
+        [<<"shards">>, _, DbName] ->
+            {system, DbName};
+        [<<"shards">>, _, Account, DbName] ->
+            {Account, DbName};
+        [<<"shards">>, _, Account | DbParts] ->
+            {Account, filename:join(DbParts)};
+        _ ->
+            {undefined, undefined}
+    catch _:_ ->
+        {undefined, undefined}
+    end.
+
+
+-spec split(binary()) -> [binary()]
+    ; ([binary()]) -> [binary()].
+split(B) when is_binary(B) ->
+    split(B, 0, 0, []);
+split(B) ->
+    B.
+
+-spec split(binary(), non_neg_integer(), non_neg_integer(), [binary()]) -> [binary()].
+split(B, O, S, Acc) ->
+    case B of
+    <<_:O/binary>> ->
+        Len = O - S,
+        <<_:S/binary, Part:Len/binary>> = B,
+        lists:reverse(Acc, [Part]);
+    <<_:O/binary, $/, _/binary>> ->
+        Len = O - S,
+        <<_:S/binary, Part:Len/binary, _/binary>> = B,
+        split(B, O+1, O+1, [Part | Acc]);
+    _ ->
+        split(B, O+1, S, Acc)
+    end.
+
+-spec time_delta(T1, T0) -> Tdiff when
+      T1 :: erlang:timestamp(),
+      T0 :: erlang:timestamp(),
+      Tdiff :: integer().
+time_delta(T1, T0) ->
+    trunc(timer:now_diff(T1, T0) / 1000).
+
+
+-spec rw(io_dimensions()) -> read_write().
+rw({pread_iolist, _}) ->
+    reads;
+rw({append_bin, _}) ->
+    writes;
+rw({append_bin, _, _}) ->
+    writes;
+rw(_) ->
+    unknown.
+
+
+-spec prioritize_request(ioq_request(), state()) -> priority().
+prioritize_request(Req, State) ->
+    #state{
+        class_p = ClassP,
+        user_p = UserP,
+        shard_p = ShardP,
+        max_priority = Max
+    } = State,
+    case ioq_config:prioritize(Req, ClassP, UserP, ShardP) of
+        Priority when Priority < 0.0 -> 0.0;
+        Priority when Priority > Max -> Max;
+        Priority -> Priority
+    end.
+
+
+-spec maybe_seed() -> {integer(), integer(), integer()}.
+maybe_seed() ->
+    case get(random_seed) of
+        undefined ->
+            <<A:32, B:32, C:32>> = crypto:strong_rand_bytes(12),
+            Seed = {A, B, C},
+            random:seed(Seed),
+            Seed;
+        Seed ->
+            Seed
+    end.
+
+
+%% ioq_server2 Tests
+
+
+-ifdef(TEST).
+
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+mock_server() ->
+    mock_server([]).
+
+
+mock_server(Config) ->
+    meck:new(config),
+    meck:expect(config, get, fun(Group) ->
+        couch_util:get_value(Group, Config, [])
+    end),
+    meck:expect(config, get, fun(_,_) ->
+        undefined
+    end),
+    meck:expect(config, get, fun("ioq2", _, Default) ->
+        Default
+    end),
+    meck:expect(config, get_integer, fun("ioq2", _, Default) ->
+        Default
+    end),
+    meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
+        Default
+    end),
+    {ok, State} = ioq_server2:init([?SERVER_ID(1), 1]),
+    State.
+
+
+unmock_server(_) ->
+    true = meck:validate(config),
+    ok = meck:unload(config).
+
+
+empty_config_test_() ->
+    {
+        "Empty config tests",
+        {
+            foreach,
+            fun mock_server/0,
+            fun unmock_server/1,
+            [
+                fun test_basic_server_config/1,
+                fun test_simple_request_priority/1,
+                fun test_simple_dedupe/1,
+                fun test_io_error/1
+            ]
+        }
+    }.
+
+
+simple_config_test_() ->
+    {
+        "Simple config tests",
+        {
+            foreach,
+            fun() ->
+                Config = [
+                    {"ioq2.classes", [{"db_compact", "0.9"}]},
+                    {"ioq2", [{"resize_limit", "10"}]}
+                ],
+                mock_server(Config)
+            end,
+            fun unmock_server/1,
+            [
+                fun test_simple_config/1,
+                fun test_auto_scale/1
+            ]
+        }
+    }.
+
+
+priority_extremes_test_() ->
+    {
+        "Test min/max priorities",
+        {
+            foreach,
+            fun() ->
+                Config = [
+                    {"ioq2.classes", [
+                        {"db_compact", "9999999.0"},
+                        {"interactive", "-0.00000000000000001"}
+                    ]}
+                ],
+                mock_server(Config)
+            end,
+            fun unmock_server/1,
+            [
+                fun test_min_max_priorities/1
+            ]
+        }
+    }.
+
+
+queue_depths_test_() ->
+    Foo = <<"foo">>,
+    Bar = <<"bar">>,
+    Reqs = [
+        #ioq_request{user=Foo, class=db_compact},
+        #ioq_request{user=Bar, class=db_compact},
+        #ioq_request{user=Bar, class=view_compact},
+        #ioq_request{user=Foo, class=internal_repl},
+        #ioq_request{user=Bar, class=internal_repl},
+        #ioq_request{user=Bar, class=internal_repl},
+        #ioq_request{class=low},
+
+        #ioq_request{user=Foo, class=interactive},
+        #ioq_request{user=Foo, class=interactive},
+        #ioq_request{user=Foo, class=db_update},
+        #ioq_request{user=Foo, class=view_update},
+        #ioq_request{user=Foo, class=view_update},
+        #ioq_request{user=Foo, class=view_update},
+        #ioq_request{user=Foo, class=view_update},
+
+        #ioq_request{user=Bar, class=interactive},
+        #ioq_request{user=Bar, class=db_update},
+        #ioq_request{user=Bar, class=db_update},
+        #ioq_request{user=Bar, class=db_update},
+        #ioq_request{user=Bar, class=view_update}
+    ],
+    Expected = [
+        {compaction, 3},
+        {replication, 3},
+        {low, 1},
+        {channels, {[
+            {<<"foo">>, [2,1,4]},
+            {<<"bar">>, [1,3,1]}
+        ]}}
+    ],
+
+    {
+        "Test queue depth stats",
+        ?_assertEqual(
+            Expected,
+            get_queue_depths(Reqs)
+        )
+    }.
+
+
+test_basic_server_config(St0) ->
+    {reply, RespState, _St1, 0} = handle_call(get_state, pid, St0),
+    [
+        ?_assertEqual([], RespState#state.user_p),
+        ?_assertEqual(
+            lists:sort(?DEFAULT_CLASS_PRIORITIES),
+            lists:sort(RespState#state.class_p)
+        ),
+        ?_assertEqual([], RespState#state.shard_p)
+    ].
+
+
+test_simple_request_priority(St0) ->
+    From = pid1,
+    Request0 = #ioq_request{class=db_compact},
+    Priority = prioritize_request(Request0, St0),
+    Request1 = Request0#ioq_request{
+        init_priority = Priority,
+        from = From,
+        key = St0#state.next_key
+    },
+    {noreply, St1, 0} = handle_call(Request0, From, St0),
+    {reply, RespState, _St2, 0} = handle_call(get_state, From, St1),
+    [
+        ?_assertEqual(
+            [{Priority, Request1}],
+            RespState#state.queue
+        )
+    ].
+
+
+test_simple_dedupe(St0) ->
+    FromA = pid1,
+    FromB = pid2,
+    Fd = fd,
+    Pos = 1234,
+    Msg = {pread_iolist, Pos},
+    Request0 = #ioq_request{
+        class=db_compact,
+        fd = Fd,
+        msg = Msg
+    },
+    {ReqKey, St1} = waiter_key(Request0, St0),
+    Priority = prioritize_request(Request0, St1),
+    Request1A = Request0#ioq_request{
+        init_priority = Priority,
+        from = FromA,
+        key = {Fd, Pos}
+    },
+    _Request1B = Request0#ioq_request{
+        init_priority = Priority,
+        from = FromA,
+        key = {Fd, Pos}
+    },
+    {noreply, St2, 0} = handle_call(Request0, FromA, St1),
+    {noreply, St3, 0} = handle_call(Request0, FromB, St2),
+    {reply, RespState, _St4, 0} = handle_call(get_state, FromA, St3),
+    [
+        ?_assertEqual(
+            [{Priority, Request1A}],
+            RespState#state.queue
+        ),
+        ?_assertEqual(
+            [{ReqKey, [FromB, FromA]}],
+            RespState#state.waiters
+        )
+    ].
+
+
+test_simple_config(St) ->
+    RequestA = #ioq_request{},
+    RequestB = #ioq_request{class=db_compact},
+    PriorityA = prioritize_request(RequestA, St),
+    PriorityB = prioritize_request(RequestB, St),
+
+    [
+        ?_assertEqual(
+            1.0,
+            PriorityA
+        ),
+        ?_assertEqual(
+            0.9,
+            PriorityB
+        )
+    ].
+
+
+test_min_max_priorities(St) ->
+    RequestA = #ioq_request{class=interactive},
+    RequestB = #ioq_request{class=db_compact},
+    PriorityA = prioritize_request(RequestA, St),
+    PriorityB = prioritize_request(RequestB, St),
+
+    [
+        ?_assertEqual(
+            0.0,
+            PriorityA
+        ),
+        ?_assertEqual(
+            ?DEFAULT_MAX_PRIORITY,
+            PriorityB
+        )
+    ].
+
+
+test_auto_scale(#state{queue=HQ}=St0) ->
+    %% start with iterations=2 so we can tell when we auto-scaled
+    St1 = St0#state{resize_limit=10, iterations=2},
+    Pid = spawn(fun() -> receive baz -> ok end end),
+    T0 = os:timestamp(),
+    BaseReq = #ioq_request{t0=T0, fd=Pid},
+
+    RequestA = BaseReq#ioq_request{ref=make_ref()},
+    RequestB = BaseReq#ioq_request{class=db_compact, ref=make_ref()},
+    PriorityA = prioritize_request(RequestA, St1),
+    PriorityB = prioritize_request(RequestB, St1),
+
+    {noreply, St2, 0} = handle_call(RequestB, Pid, St1),
+    {noreply, St3, 0} = handle_call(RequestA, Pid, St2),
+
+    {_, #ioq_request{init_priority=PriorityA2}} = hqueue:extract_max(HQ),
+    Tests0 = [?_assertEqual(PriorityA, PriorityA2)],
+    {_St, Tests} = lists:foldl(
+        fun(_N, {#state{iterations=I, resize_limit=RL}=StN0, TestsN}) ->
+            ReqN = BaseReq#ioq_request{ref=make_ref()},
+            ExpectedPriority = case I == 1 of
+                false -> PriorityA;
+                true -> PriorityB
+            end,
+            {noreply, StN1, 0} = handle_call(ReqN, Pid, StN0),
+            StN2 = submit_request(ReqN, StN1),
+            {_, #ioq_request{init_priority=PriorityN}} = hqueue:extract_max(HQ),
+            {
+                StN2,
+                [?_assertEqual(ExpectedPriority, PriorityN) | TestsN]
+            }
+        end,
+        {St3, Tests0},
+        lists:seq(1, St3#state.resize_limit + 7)
+    ),
+    lists:reverse(Tests).
+
+
+all_test_() ->
+    {setup, fun setup/0, fun cleanup/1, fun instantiate/1}.
+
+
+many_clients_test_() ->
+    FDCount = 50,
+    ClientCount = 10,
+    MaxDelay = 20,
+    {
+        setup,
+        fun() -> setup_many(FDCount, MaxDelay) end,
+        fun cleanup/1,
+        fun(Servers) -> test_many_clients(Servers, ClientCount) end
+    }.
+
+
+setup() ->
+    meck:new(config, [passthrough]),
+    meck:expect(config, get_boolean,
+        fun
+            ("ioq2", "enabled", _) ->
+                true;
+            ("ioq2", "server_per_scheduler", _) ->
+                false;
+            (_, _, Default) ->
+                Default
+        end
+    ),
+    {ok, _} = application:ensure_all_started(ioq),
+    FakeServer = fun(F) ->
+        receive {'$gen_call', {Pid, Ref}, Call} ->
+            Pid ! {Ref, {reply, Call}}
+        end,
+        F(F)
+    end,
+    spawn(fun() -> FakeServer(FakeServer) end).
+
+
+setup_many(Count, RespDelay) ->
+    {ok, _} = application:ensure_all_started(ioq),
+    meck:new(config, [passthrough]),
+    meck:expect(config, get_boolean,
+        fun
+            ("ioq2", "enabled", _) ->
+                true;
+            ("ioq2", "server_per_scheduler", _) ->
+                false;
+            (_, _, Default) ->
+                Default
+        end
+    ),
+    FakeServer = fun(F) ->
+        receive {'$gen_call', {Pid, Ref}, Call} ->
+            timer:sleep(random:uniform(RespDelay)),
+            Pid ! {Ref, {reply, Call}}
+        end,
+        F(F)
+    end,
+    [spawn(fun() -> FakeServer(FakeServer) end) || _ <- lists:seq(1, Count)].
+
+
+cleanup(Server) when not is_list(Server) ->
+    cleanup([Server]);
+cleanup(Servers) ->
+    ok = application:stop(ioq),
+    true = meck:validate(config),
+    ok = meck:unload(config),
+    [exit(Server, kill) || Server <- Servers].
+
+
+instantiate(S) ->
+    Old = ?DEFAULT_CONCURRENCY * length(ioq_sup:get_ioq2_servers()),
+    [{inparallel, lists:map(fun(IOClass) ->
+        lists:map(fun(Shard) ->
+            check_call(S, make_ref(), priority(IOClass, Shard))
+        end, shards())
+    end, io_classes())},
+    ?_assertEqual(Old, ioq:set_disk_concurrency(10)),
+    ?_assertError(badarg, ioq:set_disk_concurrency(0)),
+    ?_assertError(badarg, ioq:set_disk_concurrency(-1)),
+    ?_assertError(badarg, ioq:set_disk_concurrency(foo))].
+
+
+check_call(Server, Call, Priority) ->
+    ?_assertEqual({reply, Call}, ioq_server2:call(Server, Call, Priority)).
+
+
+io_classes() -> [interactive, view_update, db_compact, view_compact,
+    internal_repl, other, db_meta].
+
+
+shards() ->
+    [
+        <<"shards/0-1/heroku/app928427/couchrest.1317609656.couch">>,
+        <<"shards/0-1/foo">>,
+        <<"shards/0-3/foo">>,
+        <<"shards/0-1/bar">>,
+        <<"shards/0-1/kocolosk/stats.1299297461.couch">>,
+        <<"shards/0-1/kocolosk/my/db.1299297457.couch">>,
+        other
+    ].
+
+
+priority(view_update, Shard) ->
+    {view_update, Shard, <<"_design/foo">>};
+priority(Any, Shard) ->
+    {Any, Shard}.
+
+
+test_many_clients(Servers, ClientCount) ->
+    ClientFun = fun() ->
+        ok = lists:foreach(fun(IOClass) ->
+            ok = lists:foreach(fun(Shard) ->
+                Server = random_server(Servers),
+                Ref = make_ref(),
+                Priority = priority(IOClass, Shard),
+                {reply, Ref} = ioq_server2:call(Server, Ref, Priority),
+                ok
+            end, shards())
+        end, io_classes()),
+        ok
+    end,
+    ok = lists:foreach(fun(_) -> spawn_monitor(ClientFun) end, lists:seq(1, ClientCount)),
+
+    Status = wait_for_success(ClientCount),
+    ?_assert(Status).
+
+
+wait_for_success(0) ->
+    true;
+wait_for_success(Count) when Count > 0 ->
+    receive
+        {'DOWN', _Ref, process, _Pid, normal} ->
+            wait_for_success(Count - 1);
+        Msg ->
+            ?debugFmt("UNEXPECTED CLIENT EXIT: ~p~n", [Msg]),
+            false
+    end.
+
+
+random_server(Servers) ->
+    lists:nth(random:uniform(length(Servers)), Servers).
+
+
+test_io_error(#state{waiters=Waiters, reqs=Reqs}=State) ->
+    Key = asdf,
+    Ref = make_ref(),
+    RefTag = make_ref(),
+    Req = #ioq_request{ref=Ref, key=Key},
+    khash:put(Waiters, Key, [{self(), RefTag}]),
+    khash:put(Reqs, Ref, Req),
+    Error = {exit, foo},
+    {noreply, _State1, 0} = handle_info({'DOWN', Ref, baz, zab, Error}, State),
+    Resp = receive
+        {RefTag, {'EXIT', Error}} ->
+            {ok, Error};
+        Else ->
+            {error, Else}
+        after 5000 ->
+            {error, timeout}
+    end,
+    ?_assertEqual({ok, Error}, Resp).
+
+
+-endif.
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
index c4d04a9..7ea6284 100644
--- a/src/ioq_sup.erl
+++ b/src/ioq_sup.erl
@@ -13,6 +13,7 @@
 -module(ioq_sup).
 -behaviour(supervisor).
 -export([start_link/0, init/1]).
+-export([get_ioq2_servers/0]).
 
 %% Helper macro for declaring children of supervisor
 -define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
@@ -21,4 +22,28 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    {ok, { {one_for_one, 5, 10}, [?CHILD(ioq, worker)]}}.
+    ok = ioq_config_listener:subscribe(),
+    IOQ2Children = ioq_server2_children(),
+    {ok, {
+        {one_for_one, 5, 10},
+        [
+            ?CHILD(ioq_server, worker),
+            ?CHILD(ioq_osq, worker)
+            | IOQ2Children
+        ]
+    }}.
+
+ioq_server2_children() ->
+    Bind = config:get_boolean("ioq2", "bind_to_schedulers", false),
+    ioq_server2_children(erlang:system_info(schedulers), Bind).
+
+ioq_server2_children(Count, Bind) ->
+    lists:map(fun(I) ->
+        Name = list_to_atom("ioq_server_" ++ integer_to_list(I)),
+        {Name, {ioq_server2, start_link, [Name, I, Bind]}, permanent, 5000, worker, [Name]}
+    end, lists:seq(1, Count)).
+
+get_ioq2_servers() ->
+    lists:map(fun(I) ->
+        list_to_atom("ioq_server_" ++ integer_to_list(I))
+    end, lists:seq(1, erlang:system_info(schedulers))).
diff --git a/test/ioq_config_tests.erl b/test/ioq_config_tests.erl
new file mode 100644
index 0000000..d3fee81
--- /dev/null
+++ b/test/ioq_config_tests.erl
@@ -0,0 +1,157 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_config_tests).
+
+
+-include_lib("ioq/include/ioq.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+
+-define(USERS_CONFIG, [{"bar","2.4"},{"baz","4.0"},{"foo","1.2"}]).
+-define(CLASSES_CONFIG, [{"view_compact","2.4"},{"db_update","1.9"}]).
+-define(SHARDS_CONFIG, [
+    {
+        {<<"shards/00000000-1fffffff/foo/pizza_db">>, db_update},
+        5.0
+    },
+    {
+        {<<"shards/00000000-1fffffff/foo/pizza_db">>, view_update},
+        7.0
+    }
+]).
+
+
+priorities_test_() ->
+    {ok, ShardP} = ioq_config:build_shard_priorities(?SHARDS_CONFIG),
+    {ok, UserP} = ioq_config:build_user_priorities(?USERS_CONFIG),
+    {ok, ClassP} = ioq_config:build_class_priorities(?CLASSES_CONFIG),
+    Tests = [
+        %% {User, Shard, Class, UP * SP * CP}
+        {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, db_update, 1.2 * 5.0 * 1.9},
+        {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, view_update, 1.2 * 7.0 * 1.0},
+        {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, view_compact, 1.2 * 1.0 * 2.4},
+        {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, db_compact, 1.2 * 1.0 * 0.0001},
+        {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, internal_repl, 1.2 * 1.0 * 0.001},
+        {<<"baz">>, undefined, internal_repl, 4 * 1.0 * 0.001}
+    ],
+    lists:map(
+        fun({User, Shard, Class, Priority}) ->
+            Req = #ioq_request{user=User, shard=Shard, class=Class},
+            ?_assertEqual(
+                Priority,
+                ioq_config:prioritize(Req, ClassP, UserP, ShardP)
+            )
+        end,
+        Tests
+    ).
+
+
+parse_shard_string_test_() ->
+    Shard = "shards/00000000-1fffffff/foo/pizza_db",
+    Classes = ["db_update", "view_update", "view_compact", "db_compact"],
+    lists:map(
+        fun(Class) ->
+            ShardString = Shard ++ "||" ++ Class,
+            ?_assertEqual(
+                {list_to_binary(Shard), list_to_existing_atom(Class)},
+                ioq_config:parse_shard_string(ShardString)
+            )
+        end,
+        Classes
+    ).
+
+
+parse_bad_string_test_() ->
+    Shard = "shards/00000000-1fffffff/foo/pizza_db$$$$$ASDF",
+    ?_assertEqual(
+        {error, Shard},
+        ioq_config:parse_shard_string(Shard)
+    ).
+
+
+to_float_test_() ->
+    Default = 123456789.0,
+    Tests = [
+        {0.0, 0},
+        {0.0, "0"},
+        {1.0, "1"},
+        {1.0, 1},
+        {7.9, 7.9},
+        {7.9, "7.9"},
+        {79.0, "79"},
+        {Default, "asdf"}
+    ],
+    [?_assertEqual(E, ioq_config:to_float(T, Default)) || {E, T} <- Tests].
+
+
+config_set_test_() ->
+    {
+        "Test ioq_config setters",
+        {
+            foreach,
+            fun() -> test_util:start_applications([config, couch_log]) end,
+            fun(_) -> test_util:stop_applications([config, couch_log]) end,
+            [
+                fun check_simple_configs/1,
+                fun check_bypass_configs/1
+            ]
+        }
+    }.
+
+
+check_simple_configs(_) ->
+    Defaults = [
+        {"concurrency", "1"},
+        {"resize_limit", "1000"},
+        {"dedupe", "true"},
+        {"scale_factor", "2.0"},
+        {"max_priority", "10000.0"},
+        {"enabled", "false"},
+        {"dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER}
+    ],
+    SetTests = [
+        {set_concurrency, 9, "9"},
+        {set_resize_limit, 8888, "8888"},
+        {set_dedupe, false, "false"},
+        {set_scale_factor, 3.14, "3.14"},
+        {set_max_priority, 99999.99, "99999.99"},
+        {set_enabled, true, "true"},
+        {set_dispatch_strategy, ?DISPATCH_FD_HASH, ?DISPATCH_FD_HASH}
+    ],
+
+    Reason = "ioq_config_tests",
+    %% Custom assert for handling floats as strings
+    Assert = fun(Expected0, Value0) ->
+        ?_assertEqual(
+            ioq_config:to_float(Expected0, Expected0),
+            ioq_config:to_float(Value0, Value0)
+        )
+    end,
+
+    Tests0 = lists:map(fun({Key, Default}) ->
+        Value = config:get("ioq2", Key, Default),
+        ?_assertEqual(Default, Value)
+    end, Defaults),
+
+    lists:foldl(fun({Fun, Value, Result}, Acc) ->
+        ok = ioq_config:Fun(Value, Reason),
+        Key = lists:sublist(atom_to_list(Fun), 5, 9999),
+        Value1 = config:get("ioq2", Key, undefined),
+        [Assert(Result, Value1) | Acc]
+    end, Tests0, SetTests).
+
+
+check_bypass_configs(_) ->
+    ok = ioq_config:set_bypass(interactive, true, "Bypassing interactive"),
+    Value = config:get_boolean("ioq2.bypass", "interactive", false),
+    ?_assertEqual(true, Value).
diff --git a/test/ioq_kv_tests.erl b/test/ioq_kv_tests.erl
new file mode 100644
index 0000000..3c34573
--- /dev/null
+++ b/test/ioq_kv_tests.erl
@@ -0,0 +1,149 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_kv_tests).
+-behaviour(proper_statem).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+
+-export([
+    initial_state/0,
+    command/1,
+    precondition/2,
+    postcondition/3,
+    next_state/3,
+    random_key/1
+]).
+
+-record(st, {kvs}).
+
+proper_test_() ->
+    PropErOpts = [
+        {to_file, user},
+        {max_size, 5},
+        {numtests, 1000}
+    ],
+    {timeout, 3600, ?_assertEqual([], proper:module(?MODULE, PropErOpts))}.
+
+
+prop_ioq_kvs_almost_any() ->
+    ?FORALL({K, V}, kvs(), begin
+        case (catch ioq_kv:put(K, V)) of
+            ok -> ioq_kv:get(K) == V;
+            {'EXIT', {invalid_term, _}} -> true;
+            _ -> false
+        end
+    end).
+
+
+prop_ioq_kvs() ->
+    ?FORALL(Cmds, commands(?MODULE),
+        begin
+            cleanup(),
+            {H, S, R} = run_commands(?MODULE, Cmds),
+            ?WHENFAIL(
+                io:format("History: ~p\nState: ~p\nRes: ~p\n", [H,S,R]),
+                R =:= ok
+            )
+        end
+    ).
+
+initial_state() ->
+    #st{kvs=dict:new()}.
+
+
+command(S) ->
+    Key = {call, ioq_kv_tests, random_key, [S]},
+    frequency([
+        {1, {call, ioq_kv, init, []}},
+        {9, {call, ioq_kv, get, [Key]}},
+        {1, {call, ioq_kv, get, [key()]}},
+        {9, {call, ioq_kv, put, [Key, val()]}},
+        {1, {call, ioq_kv, put, [key(), val()]}},
+        {2, {call, ioq_kv, delete, [Key]}},
+        {1, {call, ioq_kv, delete, [key()]}}
+    ]).
+
+
+precondition(_, _) ->
+    true.
+
+
+postcondition(_S, {call, _, init, []}, ok) ->
+    true;
+postcondition(S, {call, _, get, [Key]}, Val) ->
+    case dict:is_key(Key, S#st.kvs) of
+        true ->
+            case dict:find(Key, S#st.kvs) of
+                {ok, Val} -> true;
+                _ -> false
+            end;
+        false ->
+            case Val of
+                undefined -> true;
+                _ -> false
+            end
+    end;
+postcondition(_S, {call, _, put, [_Key, _Val]}, ok) ->
+    true;
+postcondition(_S, {call, _, delete, [_Key]}, ok) ->
+    true;
+postcondition(_S, _, _) ->
+    false.
+
+
+next_state(S, _V, {call, _, init, []}) ->
+    S;
+next_state(S, _V, {call, _, get, [_Key]}) ->
+    S;
+next_state(S, _V, {call, _, put, [Key, Val]}) ->
+    S#st{
+        kvs={call, dict, store, [Key, Val, S#st.kvs]}
+    };
+next_state(S, _V, {call, _, delete, [Key]}) ->
+    S#st{
+        kvs={call, dict, erase, [Key, S#st.kvs]}
+    }.
+
+
+random_key(#st{kvs=KVs}) ->
+    Keys0 = dict:fetch_keys(KVs),
+    Keys = lists:append(Keys0, [foo]),
+    NumKeys = erlang:length(Keys),
+    KeyPos = random:uniform(NumKeys),
+    lists:nth(KeyPos, Keys).
+
+cleanup() ->
+    code:purge(ioq_kv_dyn),
+    code:delete(ioq_kv_dyn).
+
+
+% Generators
+
+key() -> almost_any().
+val() -> almost_any().
+kvs() -> {any(), any()}.
+
+% ioq_kv can't handle storing bitstrings that don't have
+% a length divisible by 8. Instead of being clever I
+% just define an almost any.
+almost_any() ->
+    oneof([
+        integer(),
+        float(),
+        atom(),
+        binary(),
+        ?LAZY(loose_tuple(almost_any())),
+        ?LAZY(list(almost_any()))
+    ]).
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
new file mode 100644
index 0000000..b6b7bad
--- /dev/null
+++ b/test/ioq_tests.erl
@@ -0,0 +1,68 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_tests).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all_test_() ->
+    {setup, fun setup/0, fun cleanup/1, fun instantiate/1}.
+
+setup() ->
+    Apps = test_util:start_applications([
+        config, folsom, couch_log, couch_stats, ioq
+    ]),
+    FakeServer = fun(F) ->
+        receive {'$gen_call', {Pid, Ref}, Call} ->
+            Pid ! {Ref, {reply, Call}}
+        end,
+        F(F)
+    end,
+    {Apps, spawn(fun() -> FakeServer(FakeServer) end)}.
+
+cleanup({Apps, Server}) ->
+    test_util:stop_applications(Apps),
+    exit(Server, kill).
+
+instantiate({_, S}) ->
+    [{inparallel, lists:map(fun(IOClass) ->
+        lists:map(fun(Shard) ->
+            check_call(S, make_ref(), priority(IOClass, Shard))
+        end, shards())
+    end, io_classes())},
+    ?_assertEqual(20, ioq:set_disk_concurrency(10)),
+    ?_assertError(badarg, ioq:set_disk_concurrency(0)),
+    ?_assertError(badarg, ioq:set_disk_concurrency(-1)),
+    ?_assertError(badarg, ioq:set_disk_concurrency(foo))].
+
+check_call(Server, Call, Priority) ->
+    ?_assertEqual({reply, Call}, ioq:call(Server, Call, Priority)).
+
+io_classes() -> [interactive, view_update, db_compact, view_compact,
+    internal_repl, other].
+
+shards() ->
+    [
+        <<"shards/0-1/heroku/app928427/couchrest.1317609656.couch">>,
+        <<"shards/0-1/foo">>,
+        <<"shards/0-3/foo">>,
+        <<"shards/0-1/bar">>,
+        <<"shards/0-1/kocolosk/stats.1299297461.couch">>,
+        <<"shards/0-1/kocolosk/my/db.1299297457.couch">>,
+        other
+    ].
+
+priority(view_update, Shard) ->
+    {view_update, Shard, <<"_design/foo">>};
+priority(Any, Shard) ->
+    {Any, Shard}.