You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2019/03/13 19:17:46 UTC
[couchdb-ioq] 17/21: Import Cloudant's full IOQ code
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git
commit 907d196c30e3602fbd90014d9297d11c3a807b7d
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Jan 30 12:17:08 2019 +0000
Import Cloudant's full IOQ code
---
IOQ2.md | 812 ++++++++++++++++++++++++++++++++
README.md | 34 ++
include/ioq.hrl | 71 +++
operator_guide.md | 231 ++++++++++
priv/stats_descriptions.cfg | 230 ++++++++++
rebar.config | 11 +
src/ioq.app.src | 11 +-
src/ioq.erl | 176 ++-----
src/ioq_app.erl | 1 +
src/ioq_config.erl | 314 +++++++++++++
src/ioq_config_listener.erl | 54 +++
src/ioq_kv.erl | 169 +++++++
src/ioq_osq.erl | 253 ++++++++++
src/ioq_server.erl | 608 ++++++++++++++++++++++++
src/ioq_server2.erl | 1068 +++++++++++++++++++++++++++++++++++++++++++
src/ioq_sup.erl | 27 +-
test/ioq_config_tests.erl | 157 +++++++
test/ioq_kv_tests.erl | 149 ++++++
test/ioq_tests.erl | 68 +++
19 files changed, 4310 insertions(+), 134 deletions(-)
diff --git a/IOQ2.md b/IOQ2.md
new file mode 100644
index 0000000..ddf8d22
--- /dev/null
+++ b/IOQ2.md
@@ -0,0 +1,812 @@
+# IOQ2 Overview
+
+IOQ2 is a replacement for the original IOQ, with the core motivation to create
+a faster IOQ that eliminates the need for IOQ bypasses. This is achieved with
+two primary approaches:
+
+ 1. Faster data structures
+ 2. More processes
+
+IOQ2 also provides a more capable configuration system allowing
+(de)prioritization at the class, user, and shard/class levels. This means you
+can do things like bumping up global compaction priority, deprioritizing a
+problematic MT user, and bumping up view build priority on an individual shard.
+
+
+## Faster Data Structures
+
+One of the issues with IOQ1 is that it uses standard Erlang list and queue data
+structures for doing high volume modifications to the request queues and
+tracking of active requests. This quickly becomes a bottleneck, and results in
+IOQ1 being roughly an order of magnitude slower than bypassing it, in high
+throughput scenarios.
+
+We work around this issue in IOQ2 by introducing the `hqueue` data structure,
+which is an Erlang NIF wrapper around a simple mutable priority queue that only
+does in place updates. This queue is minimal in functionality and only supports
+floating point values as the priority sorting mechanism. The result is that
+IOQ2 focuses on a multiplicative prioritization scheme using chained
+multipliers allowing for different weights to classes, users, and shard/class
+pairs. This prioritization scheme can easily be extended later on to include
+additional attributes, such as CCM tier, or perhaps a feedback loop providing
+prioritization on overall request volumes/cost per user.
+
+
+## More Processes
+
+The introduction of `hqueue` has a considerable impact on IOQ and more than
+doubles the throughput of IOQ1, however, as mentioned above, IOQ1 bypasses can result in
+an order of magnitude performance difference, so clearly faster data structures
+are not sufficient to solve the problem. The main issue is that an Erlang
+process can only go as fast a single CPU core will allow, so as we continually
+add more CPU cores to production systems, single process IOQ becomes even more
+of a problem.
+
+Having all requests funnel through a single Erlang process will
+inevitably become a bottleneck. That said, the entire point of IOQ is that it
+*IS* a bottleneck! If there's no queued requests, then there's nothing to
+prioritize, making the use of IOQ questionable at best. The balancing act here
+is getting as close to IOQ bypass performance as we can while inducing enough
+of a bottleneck to effectively be able to prioritize different types of work.
+
+IOQ2 uses a set of IOQ2 processes to achieve similar levels of performance as
+an IOQ bypass. It creates a set of named `ioq_server_$N` processes for each
+Erlang scheduler in the VM. The caller requests are then dispatched to the
+appropriate IOQ2 server based on the current scheduler of the caller. Overall
+this works quite well and seems to be an effective way of ensuring sufficient
+request volume to have a queue backlog to prioritize, while also spreading out
+load as the Erlang spreads load out across more schedulers, as needed. There is
+an experimental option to bind the IOQ2 pids to the relevant schedulers, but so
+far this has not shown conclusive improvements during limited testing. Another
+potential approach here is to randomize requests to the different IOQ2 pids.
+More fine grained testing with a sharp eye on latency distributions would be
+useful here.
+
+
+# The (h)queue
+
+The new queue in IOQ2 is the `hqueue` NIF, which is a mutable heap based
+max priority queue that prioritizes on a single floating point value. What this
+means in practice is that every request gets a numeric floating point priority
+value, and is then thrown into the queue until is is popped as the max value.
+The result is a priority queue data structure that inserts new items in
+O(log(N)) and also extracts the maximum item in O(log(N)), resulting in a very
+performant data structure for the application at hand.
+
+
+## Sample Prioritizations
+
+The current prioritization scheme is a simple set of multipliers for the
+various dimensions. Currently there are three dimensions:
+
+ * Class: eg `interactive`, `db_compact`, `view_update`, etc
+ * User: the user making the request, eg `<<"foo">>`
+ * Shard/Class pair: the shard and class for the request
+ - eg `{<<"shards/00000000-1fffffff/foo">>, interactive}`
+ - this allows for things like increased compaction priority on an
+ individual shard outside of the global class multipliers
+
+Behind the scenes, this basically works as follows:
+
+```erlang
+prioritize_request(Req) ->
+ UserPriority = user_priority(Req),
+ ClassPriority = class_priority(Req),
+ ShardClassPriority = shard_class_priority(Req),
+
+ UserPriority * ClassPriority * ShardClassPriority.
+```
+
+With the default priority being the identity priority, 1.0, so in the case
+where no values are defined the multiplier above would be 1.0 * 1.0 * 1.0. The
+default class priorities are currently defined as follows in `ioq.hrl`:
+
+
+### Default Class Priorities
+
+```erlang
+-define(DEFAULT_CLASS_PRIORITIES, [
+ {customer, 1.0},
+ {internal_repl, 0.001},
+ {view_compact, 0.0001},
+ {db_compact, 0.0001},
+ {low, 0.0001},
+ {db_meta, 1.0},
+
+ {db_update, 1.0},
+ {view_update, 1.0},
+ {other, 1.0},
+ {interactive, 1.0}
+]).
+```
+
+
+## Handling Priority Starvation
+
+One potential problem with the simple floating point based priority queue is
+that lower priority items can become starved given a constant volume of higher
+priority requests. We want to ensure that requests can't get permanently
+starved in this manner, and that work progresses on all fronts in a timely
+fashion. IOQ1 handles this issue by ensuring there's always a random chance low
+priority work will be executed.
+
+In IOQ2 this issue is handled by way of an auto scaling elevator on the
+priority values. What this means is that every `N` requests, IOQ2 will scale
+the existing queued items by a configurable scaling factor. The idea is that
+you automatically increase the priority of all queued items, and if you do that
+enough times then lower priority items will eventually bubble up to the top.
+Behind the scenes hqueue is an array based heap so we can easily run through
+the array and update the priority of each item. By scaling the priority of each
+item linearly, we preserve the loop invariant sorted order of the elements in
+the heap and can accomplish this without needing to resort the heap.
+
+The default scale factor is currently `2.0`, and the default `N` value for how
+often to scale is currently every `1000` requests. Both of these values are
+config knobs. In general these values seem _ok_, but they're not particularly
+scientific, so we'll want to keep an eye on them over time in a variety of
+workloads.
+
+
+## Intentional Priority Starvation
+
+Initially, IOQ2 and hqueue required all priorities to be greater than zero, but
+this has been switched to be greater than or equal to zero. The motivation here
+is that 0.0 has the cute property of propagating through any multipliers. This
+means a zero value for any of the dimensions will make the other dimensions
+irrelevant. But what's even more interesting is that zero priority values skip
+the auto scaling elevator and will forever be stuck at zero, which provides a
+way to intentionally starve particular work types, or at the very least to
+ensure that it will never be selected unless there is no other work to do. This
+is especially useful for black balling problematic MT users, or marking a
+particular database as background only work.
+
+
+---
+
+
+# IOQ2 Operations Guide
+
+IOQ2 comes with a feature toggle, and is disabled by default. You can enable it
+with:
+
+
+## Enable IOQ2
+
+```erlang
+ioq_config:set_enabled(true, "Enabling IOQ2").
+```
+
+You can verify it's enabled by checking:
+
+```erlang
+ioq:ioq2_enabled().
+```
+
+
+## Metrics Dashboard
+
+IOQ2 has a dedicated dashboard page on `metrics.cloudant.com` with the
+expected tab name of `IOQ2`.
+
+
+---
+
+
+## Choosing Good Priority Values
+
+Choosing good priority values is going to be a crucial part of tuning IOQ2.
+Unfortunately this is not particularly obvious nor necessarily easy, and it will
+take some experimentation under different workloads to begin establishing some
+best practices. Hopefully folks can start updating this section with some
+useful tips and tricks for different workloads. Below there's more
+documentation on how to validate the priority configs for different request
+types. The primary motivation for adding that logic was to help facilitate
+experimentation of different configuration options, and to aid in understanding
+how the different configurations impact request prioritization.
+
+It will be useful to keep in mind the ranges of priority values. By default all
+interactive/view_update/db_update/other class requests have a priority of
+`1.0`, and assuming no user specific or shard specific configs, those requests
+will have a priority of `1.0`. Similarly, standard background tasks like
+compaction and internal replication have a default priority of `0.0001`. So
+primary database operations by default have a thousand fold prioritization over
+background tasks. The default bounds of prioritization are from `0.0` to
+`10000.0`, so you have a decent field to experiment with, and the upper bound
+can be configured higher or lower as desired.
+
+It's also important to remember the auto scaling elevator logic for prioritized
+requests. Every `N` requests all currently queued requests have their
+priorities linearly scaled, so after sufficiently long time in the queue, all
+requests (with non `0.0` priorities) will eventually become the top priority
+(assuming constant priorities coming in). The scaling factor can be configured
+as well as how often to do the scaling.
+
+So let's look at some real world scenarios where you would want to change
+prioritization. Let's start with a simple one, what to do when a node is at 95%
+disk space? This is an easy one! just blast compaction priority. Unlike IOQ1,
+IOQ2 does not differeniate between request types in terms of selecting next
+work, it's strictly based on the priority multiplier, so you can completely
+prioritize compaction traffic over standard database operations, potentially to
+the detriment of standard operations performance. So if you set the compaction
+priority multiplier to 10000 you'll prioritize compaction work above everything
+else (assuming default priorities elsewhere). This means that as long as there
+is compaction jobs in the queue those will be handled before anything else.
+This should be a significant win for prioritizing compaction in low disk space
+scenarios.
+
+Now, let's look at a similar, albeit more complicated scenario. Disk space is
+above 85%, and you want to get out ahead of the compaction curve without
+severely impacting cluster performance for standard operations. Cranking
+compaction to the limit will get the job done, but it will also potentially
+induce performance issues for the customer and could starve normal requests.
+Here you would want to experiment a bit with gradually bumping up the
+compaction priority. Try going from the `0.0001` default to `0.001` and see how
+much that increases compaction throughput. Still not enough? try `0.01` and
+repeat. Then on to `0.1` and maybe even on to `1.0` to make compaction priority
+level with normal operations.
+
+One other thing to keep in mind here is that these prioritizations are also
+dependent on the overall request volumes. If you've got 10k pending db_update
+requests, and only 5 pending compaction requests, then cranking compaction
+priority is not going to have a massive negative impact on db_update
+throughput. Similarly, if you've only got one compaction job running, you'll
+run into diminishing returns for how effectively you can prioritize compaction
+as there's insufficient request volume to prioritize. You'll need to experiment
+with increasing Smoosh concurrency to get more jobs running to have more queued
+items to prioritize.
+
+
+## Setting Priority Values
+
+IOQ2 contains utility functions for setting all configuration related values;
+you should *not* need to use `config:set` for any IOQ2 related configuration
+changes. In addition to the various config tunables, there are builtin helpers
+to assist with setting appropriate priorities. All priority values should be
+floating point values, and the described config helpers will prevent you from
+using non floating point values. All of the config helpers here expect a
+`Reason` value to log changes as per standard auditing rules.
+
+*NOTE* the shard priorities do not include the shard suffix in the config names
+to preserve priorities between cycled dbs, so if for whatever reason you
+manually set shard priorities, make sure you use `filename:rootname(ShardName)`
+to drop the suffix so that your config options work as expected.
+
+
+### Setting Class Priority Values
+
+Class specific priority multipliers can be set as demonstrated below. The class
+name should be an Erlang atom, and the value should be a float. For example:
+
+```erlang
+ioq_config:set_class_config(interactive, 3.2, "Insightful motivation").
+ioq_config:set_class_config(db_compact, 0.5, "Moar compactions").
+```
+
+
+### Setting Shard/Class Specific Priority Values
+
+You can prioritize on Shard/Class pairs, there is no shard wide prioritization
+so you'll need to set each class as appropriate. This function takes a
+`#shard{}` record as returned by `mem3:shards`.
+
+```erlang
+Shard = hd(mem3:shards(DbName)),
+ioq_config:set_shard_config(Shard, db_update, 2.3, "Prioritize db updates").
+```
+
+You _could_ call `set_shard_config` for every shard for a given database, but
+there's a helper for that as well:
+
+
+### Setting Shard/Class Priority for all Shards in a Database
+
+There's a helper function for setting a class priority on all shards for a
+given database name. Similarly to the shard/class configs, you have to specify
+each class priority individually. You can use it as follows:
+
+```erlang
+ioq_config:set_db_config(<<"foo/bar">>, view_update, 0.8, "Build that view").
+```
+
+This is roughly equivalent to:
+
+```erlang
+[set_shard_config(S, Class, Value, Reason) || S <- mem3:shards(DbName)].
+```
+
+
+### Setting User Specific Priority Values
+
+You can set a global multiplier for a particular user to increase or decrease
+the priority of all of their requests. Here's an example:
+
+```erlang
+ioq_config:set_user_config(<<"foo">>, 3.7, "Priority user").
+```
+
+The `set_user_config` currently does not validate that the user exists, so
+you'll want to validate you set the config for the appropriate user.
+
+
+### Verifying Expected Prioritization Matches up with Reality
+
+Ok great, so you've just used the handy helpers to set varius priority values,
+but how do you verify it did what you expect? How do you test to see that the
+multipliers result in a prioritization in the desired range? The prioritization
+logic is self contained and can easily be experimented with. You have two
+options, either with the `check_priority/3` function, or by using the
+`prioritize` function directly. The `check_priority` function is the simple
+approach, and precludes you from having to build up the relevant priority data
+structures. It can be used as follows:
+
+```erlang
+User = <<"foo">>,
+DbName = <<"foo/bar">>,
+Shard = hd(mem3:shards(DbName)),
+ioq_config:check_priority(internal_repl, User, Shard).
+```
+
+That will return the floating point prioritization value for that request. You
+can also experiment with your own config options directly by way of using the
+`ioq_config:prioritize` function. To demonstrate an example of using this,
+here is the source code for the `check_priority` function used above:
+
+```erlang
+-spec check_priority(atom(), binary(), binary()) -> float().
+check_priority(Class, User, Shard0) ->
+ {ok, ClassP} = build_class_priorities(),
+ {ok, UserP} = build_user_priorities(),
+ {ok, ShardP} = build_shard_priorities(),
+
+ Shard = filename:rootname(Shard0),
+ Req = #ioq_request{
+ user = User,
+ shard = Shard,
+ class = Class
+ },
+
+ prioritize(Req, ClassP, UserP, ShardP).
+```
+
+The `build_*_priorities()` functions are all exported from the `ioq_config`
+module and are directly usable for easy testing. You can also see the full list
+of priority values from those priority data structures like so:
+
+```erlang
+(node1@127.0.0.1)14> khash:to_list(ShardP).
+[{{<<"shards/00000000-1fffffff/foo">>,interactive},1.0e3},
+ {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update},
+ 1.5}]
+```
+
+
+---
+
+
+## Other Configuration Knobs
+
+There are a number of other configuration knobs availabe and they're detailed
+below.
+
+
+### IOQ2 Concurrency
+
+We'll start out with one of the more awkward configuration tunables:
+concurrency! This option is awkward because it fundamentally changes the
+dynamics of IOQ, both IOQ1 and IOQ2. If concurrency is higher than the number
+of parallel requests, then you'll never actually prioritize things and using
+IOQ is a waste. If it's too low then you'll overly bottleneck the system and
+cause a backup of requests.
+
+This awkwardness is further compounded by the fact that IOQ2 is inherently
+concurrent in that it has one IOQ2 pid per scheduler, so you must exercise
+caution with setting the concurrency value! This value is propagated to every
+IOQ2 pid, so setting concurrency is essentially multiplicative, and total
+concurrency will be `concurrency * num_schedulers`. Most of our newer systems
+now have 48 cores, and we have have systems with 56 cores now, so setting IOQ2
+concurrency to 5 could result in a total concurrency of 250+!!! The
+`ioq:get_disk_concurrency()` function (which calls
+`ioq_server2:get_concurrency()` when IOQ2 is enabled) will aggregate these
+concurrency values together, giving you the full total, so that's useful to
+double check.
+
+Interestingly enough, the best concurrency value found so far through emperical
+means is concurrency=1 per IOQ2 pid! This is the current default, and so on
+machines with 48 cores we end up with a total concurrency of 49. So far the
+default of one has been fairly effective, but given sufficient volume of
+requests it might be worthwhile to bump it up. Start small and trying bumping
+it up to two, or maybe three. For example:
+
+```erlang
+ioq_config:set_concurrency(2, "Bumping concurrency").
+```
+
+*NOTE* if you do feel the need to update concurrency, please do notify
+@chewbranca afterwards so we can observe this in more workloads.
+
+
+### IOQ2 Resize Limit
+
+The resize limit value controls how many requests to handle before triggering
+the auto scaling elevator logic described above. This defaults to 1000, and can
+be changed with:
+
+```erlang
+ioq_config:set_resize_limit(5000, "Huge resize limit test").
+```
+
+*NOTE* if you do feel the need to update resize_limit, please do notify
+@chewbranca afterwards so we can observe this in more workloads.
+
+
+### IOQ2 Scale Factor
+
+The scale factor defines the multiplier to use during auto scaling when the
+resize limit is hit. This currently defaults to two, which means every ten
+auto scale iterations you'll have increased the priority one thousand fold for
+any requests that have been in the queue for all ten cycles. This value may or
+may not be too aggressive. Setting this to one essentially eliminates the auto
+scaling elevator logic entirely, which is not really recommended. You can
+update it as follows:
+
+```erlang
+ioq_config:set_scale_factor(1.7, "Modifying scale factor").
+```
+
+*NOTE* if you do feel the need to update scale_factor, please do notify
+@chewbranca afterwards so we can observe this in more workloads.
+
+
+### IOQ2 Max Priority
+
+Max priority establishes an upper bound on the priority values. It currently
+defaults to 10000.0. There is also an implicit lower bound on priority values
+of 0.0. Depending on how wild you go with the multipliers, it might be useful
+to increase this value, which can be done with the following:
+
+```erlang
+ioq_config:set_max_priority(55555.0, "Expand priority space").
+```
+
+### IOQ2 Dedupe
+
+Both IOQ1 and IOQ2 have a dedupe feature that will avoid performing the same
+read multiple times in parallel. In IOQ1 this operation scanned through lists
+and could become a considerable resource hog. In IOQ2 this is a simple khash
+lookup and should not be a problem. You should *not* need to ever disable this.
+For whatever reason if you need to, you can do so with:
+
+```erlang
+ioq_config:set_dedupe(false, "Disable dedupe test").
+```
+
+*NOTE* if you do feel the need to update dedupe, please do notify
+@chewbranca afterwards so we can observe this in more workloads.
+
+
+### IOQ2 Bypass
+
+IOQ2 has the same bypass logic as IOQ1, however, the whole point of IOQ2 is to
+make a sufficiently performant IOQ that bypasses are *not* necessary. This
+functionality was included in IOQ2 as a backup in case max throughput is
+essential and unreachable with IOQ2. You can set it in the standard manner, but
+in the `ioq2.bypass` namespace as follows:
+
+```erlang
+ioq_config:set_bypass(interactive, true, "Bypass interactive channel").
+```
+
+*NOTE* if you do feel the need to bypass IOQ2, please do notify
+@chewbranca afterwards so we can observe this in more workloads. Yes, this
+NOTE blurb is in a number of config descriptions, but _please_ do notify
+@chewbranca if you feel the need to bypass anything.
+
+
+### IOQ2 Dispatch Strategy
+
+IOQ2 utilizes many processes to achieve the desired throughput and performance.
+There are several different dispatch strategies for determining how requests are
+funneled through these IOQ pids, and a `single_server` fallback in the event
+only a single IOQ2 server is desired. Changing dispatch strategies is a *safe*
+operation to perform, all the pids already exist and it will just toggle which
+to go through. All active requests will continue to go through the IOQ2 pid they
+were initially handled by, and any new requests will go through the specified
+IOQ2. The four current dispatch strategies are:
+
+ * "server_per_scheduler"
+ * "fd_hash"
+ * "random"
+ * "single_server"
+
+
+### Server per Scheduler Dispatch Strategy
+
+```erlang
+ioq_config:set_dispatch_strategy("server_per_scheduler", "Changing dispatch)).
+```
+
+This is the default dispatch strategy. IOQ2 creates `N` `ioq_server2` pids, where
+`N` is the number of Erlang VM schedulers on the current system, which defaults
+to the number of CPU Cores. This dispatch strategy uses the the current
+scheduler of the caller process to determine which IOQ2 server to use. This has
+the nice property of automatically distributing work out across IOQ2 servers
+based on how the Erlang VM is spreading out work across the schedulers. In
+practice this works pretty well and seems reasonable at a high level, but it may
+or may not be optimal for all workloads, which is why we have multiple dispatch
+strategies.
+
+
+### FD Hash Scheduler Dispatch Strategy
+
+```erlang
+ioq_config:set_dispatch_strategy("fd_hash", "Changing dispatch)).
+```
+
+The `fd_hash` dispatch strategy hashes on the couch_file pid the request has as
+a destination, and then ensures that all requests to the same couch_file pid go
+through a single IOQ2 pid. This provides the most control over prioritization of
+requests to individual shards, as _all_ requests to that shard will go through
+the single IOQ2 pid, providing global prioritization rather than localized by
+IOQ2 pid. This can be useful when dealing with overloaded couch_file pids where
+you want to minimize and focus work send to those pids. Also, by funneling all
+reuqests to the same shard through the same IOQ2 pid, this increases the
+opportunity for deduping requests, which can be significant. This dispatch
+strategy can result in uneven distribution of work across IOQ2 pids, so it's not
+appropriate for all situations, but for many dedicated clusters this could be an
+ideal dispatch strategy.
+
+
+### Random Dispatch Strategy
+
+
+```erlang
+ioq_config:set_dispatch_strategy("random", "Changing dispatch)).
+```
+
+The `random` dispatch strategy just randomly selects one of the IOQ2 pids to
+send the request to. This dispatch strategy uses a random normal distribution
+and should result in roughly even work distributed across all IOQ2 pids. This is
+not the default strategy because if there's less concurrent requests active in
+the system than total IOQ2 pids, there will not actually be any prioritization
+taking place, in which case the `server_per_scheduler` dispatch strategy should
+be preferred as it will reduce the number of IOQ2 pids in use as a function of
+how much work is on the system.
+
+
+### Single Server Dispatch Strategy
+
+```erlang
+ioq_config:set_dispatch_strategy("random", "Changing dispatch)).
+```
+
+This is a fallback dispatch strategy that may or may not be removed at some
+point. This utilizes a single IOQ2 pid for _all_ requests, eliminating the
+benefits of parallel IOQ2 pids and inevitably resulting in IOQ2 becoming a
+bottleneck in the same way as IOQ1, albeit a faster bottleneck.
+
+
+---
+
+
+## Finding the pids
+
+The IOQ2 pids per scheduler have registered names of the form `ioq_server_$N`
+where `$N` is the scheduler id, starting from 1. You can get a list of all the
+IOQ2 pids on the current system with the following:
+
+```erlang
+(node1@127.0.0.1)10> ioq_sup:ioq_server_pids().
+[ioq_server_1,ioq_server_2]
+```
+
+
+## ioq_config:ioq_classes
+
+You can see the proper names for all registered IOQ classes with the following:
+
+```erlang
+(node1@127.0.0.1)15> ioq_config:ioq_classes().
+[customer,internal_repl,view_compact,db_compact,low,db_meta,
+ db_update,view_update,other,interactive]
+```
+
+
+## ioq_server2:get_state
+
+You can see a human readable representation of the IOQ2 server state with the
+following block of code. The output is "human readable" in that the khash and
+hqueue data structures have been transformed into lists so the contents can be
+viewed. This fetches the state of the `ioq_server_1` pid. If you want a
+different pid you'll need to manually `gen_server:call` into it.
+
+```erlang
+(node1@127.0.0.1)16> ioq_server2:get_state().
+{state,[],[],[],1,0,
+ [{view_update,1.0},
+ {view_compact,0.0001},
+ {db_compact,0.0001},
+ {low,0.0001},
+ {db_update,1.0},
+ {customer,1.0},
+ {internal_repl,0.001},
+ {interactive,1.0},
+ {other,1.0},
+ {db_meta,1.0}],
+ [],
+ [{{<<"shards/00000000-1fffffff/foo">>,interactive},1.0e3},
+ {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update},
+ 1.5}],
+ 2.0,true,1000,1,ioq_server_1,0,normal,1.0e4}
+```
+
+If you want to have the pretty printed version and be able to fetch the fields
+directly, you'll need to include the `ioq_server2` records, for example:
+
+```erlang
+(node1@127.0.0.1)17> rr(ioq_server2), ioq_server2:get_state().
+#state{reqs = [],waiters = [],queue = [],concurrency = 1,
+ iterations = 0,
+ class_p = [{view_update,1.0},
+ {view_compact,0.0001},
+ {db_compact,0.0001},
+ {low,0.0001},
+ {db_update,1.0},
+ {customer,1.0},
+ {internal_repl,0.001},
+ {interactive,1.0},
+ {other,1.0},
+ {db_meta,1.0}],
+ user_p = [],
+ shard_p = [{{<<"shards/00000000-1fffffff/foo">>,interactive},
+ 1.0e3},
+ {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update},
+ 1.5}],
+ scale_factor = 2.0,dedupe = true,resize_limit = 1000,
+ next_key = 1,server_name = ioq_server_1,scheduler_id = 1,
+ collect_stats = normal,max_priority = 1.0e4}
+```
+
+To fetch the server state from a particular IOQ2 pid, you can do so with the
+following:
+
+```erlang
+(node1@127.0.0.1)18> gen_server:call(ioq_server_2, get_state).
+#state{reqs = [],waiters = [],queue = [],concurrency = 1,
+ iterations = 0,
+ class_p = [{view_update,1.0},
+ {view_compact,0.0001},
+ {db_compact,0.0001},
+ {low,0.0001},
+ {db_update,1.0},
+ {customer,1.0},
+ {internal_repl,0.001},
+ {interactive,1.0},
+ {other,1.0},
+ {db_meta,1.0}],
+ user_p = [],
+ shard_p = [{{<<"shards/00000000-1fffffff/foo">>,interactive},
+ 1.0e3},
+ {{<<"shards/00000000-1fffffff/foo/pizza_db">>,db_update},
+ 1.5}],
+ scale_factor = 2.0,dedupe = true,resize_limit = 1000,
+ next_key = 1,server_name = ioq_server_2,scheduler_id = 2,
+ collect_stats = normal,max_priority = 1.0e4}
+```
+
+
+---
+
+
+## Gotchas
+
+Some miscellaneous "gotchas" to be aware of.
+
+
+### Shard Class Configs Drop Suffixes
+
+By default the shard names include the db file suffix which is the timestamp of
+creation time. These suffixes must be dropped from the config entries,
+otherwise they will not be picked up in the IOQ2 config. For instance, here's
+what the default name looks like, followed by properly truncating the suffix:
+
+```erlang
+(node1@127.0.0.1)21> S#shard.name.
+<<"shards/00000000-1fffffff/foo.1503945430">>
+(node1@127.0.0.1)22> filename:rootname(S#shard.name).
+<<"shards/00000000-1fffffff/foo">>
+```
+
+
+### Shard Configs Persist Through Db Cycles, but not Reshards
+
+In the `Shard Class Configs Drop Suffixes` "gotcha" above, you'll see that
+suffixes are not an allowed part of the shard config keys. The motivation here
+is to allow for database configs persisting through cycles, eg deleting and
+recreating a database will preserve the config.
+
+*HOWEVER*, if you delete the database and recreate it with a new sharding
+factor, you'll end up with a completely different set of shards and the old
+configs will no longer map over and it will essentially reset to the defaults.
+This should be obvious given that the shard configs are keyed on the full shard
+name, so switching said shard name will result in a different config key.
+You'll need to manually reset the configs with the appropriate new shards once
+the database has been recreated or resharded.
+
+
+### Non integer/float Priority Configs are Ignored
+
+If you set a config value that is not an integer or floating point value, that
+configuration will be silently ignored and replaced with the default value of
+`1.0`. Check out the docs above about how to verify config expectations. The
+`ioq_config` helpers described above will only allow you to set configs with
+floating point values, so if you only use those this should never be a problem.
+However if you manually set the config values you might run into this. The
+IOQ2 config will attempt to convert integers to floats, but any other values
+will be ignored.
+
+
+### Only a subset of metrics reducers are enabled
+
+The https://metrics.cloudant.com tabs are predominantly powerd by Centinela
+reducers that aggregate the node specific metrics into an individual global
+metric. Due to the current precarious state of the metrics stack, the IOQ2 work
+has been cautious with the introduction of new metrics. Only the reduced
+metrics currently enabled on the IOQ2 tab have reducers enabled. If you need
+additional reducers you'll need to add them. For example, the iowait metrics
+are reduced on median but not P99.9:
+
+```
+(chewbranca)-(jobs:0)-(~)
+(! 14529)-> acurl
+https://cloudant.cloudant.com/metrics/couchdb.io_queue2.iowait.percentile.50
+{"_id":"couchdb.io_queue2.iowait.percentile.50","_rev":"1-e0ad9472a61b9a46ace4c9852ce63a36","reduce":true,"reducers":["mean"]}
+
+(chewbranca)-(jobs:0)-(~)
+(! 14530)-> acurl
+https://cloudant.cloudant.com/metrics/couchdb.io_queue2.iowait.percentile.999
+{"error":"not_found","reason":"missing"}
+```
+
+
+### IOQ2 Concurrency is Multiplicative
+
+This is covered in depth in the concurrency sections above, but this is an
+important enough point to warrant calling it out here as well. With IOQ1
+concurrency is singular for the one IOQ1 pid, but in IOQ2 concurrency is set
+for *every* IOQ2 pid. This means you should realistically never set concurrency
+above single digits for IOQ2. Setting concurrency to five on a system with 56
+CPU cores will result in a total concurrency of over 250, which is probably not
+productive.
+
+
+### IOQ2 Can Only Prioritize Work when there's a Backlog of Work
+
+As mentioned above, IOQ is inherently a bottleneck, otherwise it isn't actually
+able to prioritize any work. On a similar note, if there's an insufficient
+volume of a particular request type, you won't be able to significantly
+influence the throughput of that type. For instance, you can bump compaction
+priority to the moon but if there's only one compaction job you're not going to
+make a significant difference to the volume of compaction requests.
+
+The other side of this is that IOQ2 is only effective at prioritizing work when
+there's a variety of work types. If you've got a problematic MT user that you
+want to back burner, you can set the user priority to `0.0` and all their
+requests will be prioritized at `0.0` and will never benefit from the auto
+scaling elevator. That said, if there are _no_ other users making requests to
+the system, then that user's requests will still be chugging along as fast as
+they come in. IOQ2 is *NOT* a rate limiting system, it's a prioritization
+system that prioritizes requests relative to all other pending requests.
+Without sufficient work it's essentially just a pass through.
+
+
+---
+
+
+## Request for feedback
+
+IOQ and IOQ2 are complicated beasts, and there's a lot of tunable knobs here.
+It is expected that we'll need to experiment with different levels for
+different workloads, so please do be diligent about informing @chewbranca of
+situations where you've had to change the configuration options above that
+request notifications. Any other thoughts/comments/suggestions welcome as well.
+Similarly, feedback is welcome on the IOQ2 metrics tab as well.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..ccd676c
--- /dev/null
+++ b/README.md
@@ -0,0 +1,34 @@
+### IOQ classes
+The following are the list of IOQ classes:
+
+* interactive
+* db_update
+* view_update
+* db_compact
+* view_compact
+* internal_repl
+* low
+
+
+### Bypassing IOQ
+One can configure an ioq bypass, which removes an IO class from prioritization,
+as below:
+
+ config:set("ioq.bypass", "view_update", "true")
+
+Note that setting an IOQ bypass can effectively trump all other classes,
+especially in the case of an interactive bypass v. compaction. This can lead
+to high disk usage.
+
+### Setting priorities
+The priority for a class can also be set ala:
+
+ config:set("ioq", "compaction", "0.3")
+
+Or globally, using snippet/rpc:
+
+ s:set_config("ioq", "compaction", "0.314", global)
+ rpc:multicall(config, set, ["ioq", "compaction", "0.217"])
+
+As the interactive class is 'everything else' its priority cannot be directly
+set.
diff --git a/include/ioq.hrl b/include/ioq.hrl
new file mode 100644
index 0000000..55c95f8
--- /dev/null
+++ b/include/ioq.hrl
@@ -0,0 +1,71 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(DEFAULT_PRIORITY, 1.0).
+-define(BAD_MAGIC_NUM, -12341234).
+
+%% Dispatch Strategies
+-define(DISPATCH_RANDOM, "random").
+-define(DISPATCH_FD_HASH, "fd_hash").
+-define(DISPATCH_SINGLE_SERVER, "single_server").
+-define(DISPATCH_SERVER_PER_SCHEDULER, "server_per_scheduler").
+
+-define(DEFAULT_CLASS_PRIORITIES, [
+ {customer, 1.0},
+ {internal_repl, 0.001},
+ {view_compact, 0.0001},
+ {db_compact, 0.0001},
+ {low, 0.0001},
+ {db_meta, 1.0},
+
+ {db_update, 1.0},
+ {view_update, 1.0},
+ {other, 1.0},
+ {interactive, 1.0}
+]).
+
+
+-record(ioq_request, {
+ fd,
+ msg,
+ key,
+ init_priority = 1.0,
+ fin_priority,
+ ref,
+ from,
+ t0,
+ tsub,
+ shard,
+ user,
+ db,
+ class,
+ ddoc
+}).
+
+
+-type io_priority() :: db_compact
+ | db_update
+ | interactive
+ | internal_repl
+ | other
+ | customer
+ | db_meta
+ | low.
+-type view_io_priority() :: view_compact
+ | view_update.
+-type dbcopy_string() :: string(). %% "dbcopy"
+-type dbname() :: binary() | dbcopy_string().
+-type group_id() :: any().
+-type io_dimensions() :: {io_priority(), dbname()}
+ | {view_io_priority(), dbname(), group_id()}.
+-type ioq_request() :: #ioq_request{}.
+
diff --git a/operator_guide.md b/operator_guide.md
new file mode 100644
index 0000000..81acdcc
--- /dev/null
+++ b/operator_guide.md
@@ -0,0 +1,231 @@
+# An operator's guide to IOQ
+
+IOQ handles the prioritisation of IO operations in the database. It has
+two main responsibilities:
+
+ 1. Providing configurable prioritisation of interactive requests and
+ background requests such as compaction and internal replication.
+ 2. Providing equal prioritisation for interactive requests by backend/database.
+
+From an operational perspective point 1 is of most interest as it provides a set
+of levers that can be pulled to change the behaviour of the cluster in favour
+of particular workloads or operational concerns.
+
+## Basic overview
+
+From an operational point-of-view, IOQ carries out two fundamental operations:
+
+ 1. Enqueueing requests into one of a number of available channels.
+ 2. Selecting and submitting a request from the available channels according
+ to configured priorities.
+
+IOQ categorises IO requests by class and by priority. The class of a request
+dictates the channel into which it will be enqueued and the priority influences
+the probability that a given request will be dequeued and executed.
+
+The following table lists the IOQ classes and the corresponding priorities. Note
+that the mapping of IOQ classes to class priorities is not 1:1.
+
+```
+|---------------+---------------+--------------------------------------------|
+| IOQ class | IOQ priority | Description |
+|---------------+---------------+--------------------------------------------|
+| interactive | reads, writes | IO requests related to requests made by |
+| | | users via the http layer. |
+| | | |
+| db_update | writes | Interactive IO requests which are database |
+| | | write operations. |
+| | | |
+| view_update | views | IO requests related to view index builds. |
+| | | |
+| db_compact | compaction | IO requests related to database |
+| | | compactions. |
+| | | |
+| view_compact | compaction | IO requests related to view compactions. |
+| | | |
+| internal_repl | replication | IO requests related to internal |
+| | | replication. |
+| | | |
+| low | low | IO requests related to requests made by |
+| | | users via the http layer where the |
+| | | "x-cloudant-priority: low" header is set. |
+| | | |
+| other | undefined | IO requests that do not fit any of the |
+| | | above classes. This includes search IO |
+| | | requests. |
+|---------------+---------------+--------------------------------------------|
+```
+
+## Internals
+
+To understand the relationship between the IOQ classes and the IOQ priorities
+it is helpful to understand the channels into which IO requests are enqueued.
+
+IOQ uses the following four channels:
+
+ - `Compaction`
+ - `Internal replication`
+ - `Low`
+ - `Customer`
+
+The `Customer` channel is effectively a meta-channel where each item in the
+queue represents a backend/dbname combination that consists of a further three
+channels:
+
+ - `Interactive`
+ - `DB update`
+ - `View update`
+
+Requests are enqueued according to the following scheme:
+
+ - Requests with class `internal_repl`, `low`, `db_compact` or `view_compact`
+ are enqueued into `Internal replication`, `Low` or `Compaction` channels
+ respectively.
+ - Requests with class `interactive`, `db_update` or `view_update` are enqueued
+ into the `Interactive`, `DB update` or `View update` channel of the relevant
+ `Customer` channel for the backend/database combination.
+ - Requests with class `other` are enqueued into the `Interactive` queue of a
+ `Customer` channel reserved for `other` IOQ requests.
+
+Requests are submitted as follows:
+
+ - The next item is selected from either the `Compaction`,
+ `Internal replication`, `Low` or `Customer` channel according to the
+ configured priorities (`compaction`, `replication`, `low` and `customer`).
+ - If the item is obtained from the `Compaction`, `Internal replication` or
+ `Low` channels then the request is submitted for execution.
+ - If the item is obtained from the `Customer` channel then the request is
+ selected from either the `Interactive`, `DB update` or `View update` channel
+ according to the configured priorities (`reads`, `writes`, and `views`).
+
+## Configuration
+
+Unless there is prior knowledge of the IOQ configuration required to support the
+intended workload of a cluster on a given hardware specification it is
+recommended that IOQ is initially left with the default configuration values. As
+more becomes known about the behaviour of a cluster under load the IOQ settings
+can be tuned to provide optimal performance for the production workload.
+
+Note that tuning IOQ is not the answer to all performance problems and there are
+a finite number of gains to be had (possibly zero). You should also be
+considering the total load on the cluster, the capabilities of the underlying
+hardware and the usage patterns and design of the applications which sit on top
+of the data layer.
+
+### Priorities
+
+IOQ ships with a default configuration which gives interactive reads/writes and
+view builds a high priority (`1.0`) and the background requests a much lower
+priority (`0.001` for compaction and `0.0001` for replication and low).
+
+You can set the priorities to other values using the config app in a remsh as
+follows:
+
+ config:set("ioq", "views", "0.5", "FBXXXXX reduce views IOQ priority").
+
+To return to the default value just delete the configuration value:
+
+ config:delete("ioq", "views", "FBXXXXX revert to default priority").
+
+The following sections describe typical situations where tuning IOQ priorities
+might be appropriate.
+
+#### Internal replication backlog
+
+If cluster nodes are frequently exhibiting an internal replication backlog
+then it might be worth increasing the `replication` priority.
+
+A backlog can be confirmed by checking the following graphite target:
+
+ net.cloudant.cluster001.db*.erlang.internal_replication_jobs
+
+If this value is consistently elevated by more than a few hundred changes then
+try increasing the `replication` IOQ priority:
+
+ config:set("ioq", "replication", "0.5", "FBXXXXX speed up internal replication").
+
+If this has been effective you should notice a change in the rate at which the
+metric decreases. It is worth experimenting with values as high as `1.0` however
+you will need to keep an eye on HTTP request latencies to make sure there is no
+adverse impact on other aspects of cluster performance.
+
+#### Compactions not completing quickly enough
+
+If disk usage is rising on cluster nodes and there is a corresponding backlog
+in compaction work then it might be worth increasing the `compaction` priority.
+
+Check the volume of pending changes for ongoing compaction jobs in graphite:
+
+ net.cloudant.cluster001.db1.dbcore.active_tasks.changes_pending.*compaction
+
+Increase the priority for `compaction`:
+
+ config:set("ioq", "compaction", "0.5", "FBXXXXX speed up compaction").
+
+Now monitor the changes_pending metrics to see if the rate at which changes are
+being processed has increased.
+
+The notes in previous section apply here - experiment with values as high as
+"1.0" if necessary and keep a close eye on cluster performance whilst you
+do so.
+
+#### Interactive requests and views competing for IO resource
+
+Metrics might show that read/write performance worsens when views are building
+or conversely that view build performance slows when read/write load increases.
+If the performance requirements of the cluster are such that a particular
+type of request is more critical to the application it supports then it might be
+worth reducing the other IOQ priorities, for example:
+
+ config:set("ioq", "views", "0.1", "FBXXXXX attempt to improve read/write performance").
+
+### Concurrency
+
+The concurrency defines the total number of concurrent IO operations allowed by
+IOQ. The default value is `20` however it can be worth increasing if the
+answer to the following questions is yes:
+
+ 1. Either `net.cloudant.cluster001.db1.erlang.io_queue.active_requests` or
+ `net.cloudant.cluster001.db1.couchdb.io_queue.latency` is consistently
+ elevated.
+
+ 2. Disk utilisation is significantly less than 100%.
+
+If performance is being impacted by request waiting in the queues then it is
+worth bumping IOQ concurrency (sensible values to try are `30`, `50` and `100`)
+and observing the resulting effect.
+
+Note that increasing this value beyond a certain point can result in the disks
+being overloaded and overall performance degradation. The exact point depends
+on the cluster workload and hardware so it is very important to monitor the
+cluster when making changes here.
+
+### Bypasses
+
+In extreme cases it is possible that IOQ itself is the bottleneck for certain
+request classes. If this is case then you can bypass IOQ for that request
+class altogether, e.g. for interactive requests:
+
+ config:set("ioq.bypass", "interactive", "true", "FBXXXXX attempt to improve interactive performance").
+
+Note that bypasses are set for IOQ *classes* not IOQ priorities. This means if
+you wanted to bypass all compaction requests you would need to set a bypass for
+`db_compact` and `view_compact`.
+
+The following warnings should be heeded when considering setting an IOQ bypass:
+
+ - Other request classes will continue to be routed through IOQ so will not
+ be able to compete with the bypassed requests. You should therefore monitor
+ the cluster carefully to determine that overall performance is acceptable.
+ Keep a close eye on compaction in particular (unless it is being bypassed)
+ as if the rate of compaction slows too much the disk may start filling up.
+
+ - The bypass effectively shifts the bottleneck to another part of the system
+ which is typically evident in `couch_file` and `couch_db_updater` message
+ queue backups.
+
+ - Disk performance may also become saturated which could lead to various
+ resulting performance degradations.
+
+A good rule of thumb is to avoid IOQ bypasses altogether unless the customer
+is in immediate pain.
diff --git a/priv/stats_descriptions.cfg b/priv/stats_descriptions.cfg
new file mode 100644
index 0000000..d5b15f4
--- /dev/null
+++ b/priv/stats_descriptions.cfg
@@ -0,0 +1,230 @@
+{[couchdb, io_queue, latency], [
+ {type, histogram},
+ {desc, <<"delay introduced by routing request through IO queue">>}
+]}.
+{[couchdb, io_queue, low], [
+ {type, counter},
+ {desc, <<"number of requests routed through IO at low priority">>}
+]}.
+{[couchdb, io_queue, merged], [
+ {type, counter},
+ {desc, <<"number of requests routed through IO queue that were merged">>}
+]}.
+{[couchdb, io_queue, osproc], [
+ {type, counter},
+ {desc, <<"number of requests routed through IO os queue">>}
+]}.
+{[couchdb, io_queue, reads], [
+ {type, counter},
+ {desc, <<"number of read requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, writes], [
+ {type, counter},
+ {desc, <<"number of write requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, undefined], [
+ {type, counter},
+ {desc, <<"number of requests routed through IO queue without I/O class">>}
+]}.
+{[couchdb, io_queue, unknown], [
+ {type, counter},
+ {desc, <<"number of unknown requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, db_update], [
+ {type, counter},
+ {desc, <<"DB update requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, db_compact], [
+ {type, counter},
+ {desc, <<"DB compaction requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, view_compact], [
+ {type, counter},
+ {desc, <<"view compaction requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, view_update], [
+ {type, counter},
+ {desc, <<"view indexing requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue, interactive], [
+ {type, counter},
+ {desc, <<"IO directly triggered by client requests">>}
+]}.
+{[couchdb, io_queue, internal_repl], [
+ {type, counter},
+ {desc, <<"IO related to internal replication">>}
+]}.
+{[couchdb, io_queue, other], [
+ {type, counter},
+ {desc, <<"IO related to internal replication">>}
+]}.
+{[couchdb, io_queue_bypassed, low], [
+ {type, counter},
+ {desc, <<"number of requests that bypassed IO at low priority">>}
+]}.
+{[couchdb, io_queue_bypassed, merged], [
+ {type, counter},
+ {desc, <<"number of requests that bypassed IO queue that were merged">>}
+]}.
+{[couchdb, io_queue_bypassed, osproc], [
+ {type, counter},
+ {desc, <<"number of requests that bypassed IO os queue">>}
+]}.
+{[couchdb, io_queue_bypassed, reads], [
+ {type, counter},
+ {desc, <<"number of read requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, writes], [
+ {type, counter},
+ {desc, <<"number of write requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, undefined], [
+ {type, counter},
+ {desc, <<"number of requests that bypassed IO queue without I/O class">>}
+]}.
+{[couchdb, io_queue_bypassed, unknown], [
+ {type, counter},
+ {desc, <<"number of unknown requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, db_update], [
+ {type, counter},
+ {desc, <<"DB update requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, db_compact], [
+ {type, counter},
+ {desc, <<"DB compaction requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, view_compact], [
+ {type, counter},
+ {desc, <<"view compaction requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, view_update], [
+ {type, counter},
+ {desc, <<"view indexing requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue_bypassed, interactive], [
+ {type, counter},
+ {desc, <<"bypassed IO directly triggered by client requests">>}
+]}.
+{[couchdb, io_queue_bypassed, internal_repl], [
+ {type, counter},
+ {desc, <<"bypassed IO related to internal replication">>}
+]}.
+{[couchdb, io_queue_bypassed, other], [
+ {type, counter},
+ {desc, <<"bypassed IO related to internal replication">>}
+]}.
+
+
+{[couchdb, io_queue2, io_errors], [
+ {type, counter},
+ {desc, <<"number of IO errors">>}
+]}.
+{[couchdb, io_queue2, merged], [
+ {type, counter},
+ {desc, <<"number of requests routed through IO queue that were merged">>}
+]}.
+
+{[couchdb, io_queue2, submit_delay], [
+ {type, histogram},
+ {desc, <<"delay introduced by routing request through IO queue">>}
+]}.
+{[couchdb, io_queue2, svctm], [
+ {type, histogram},
+ {desc, <<"time taken to service the IO request">>}
+]}.
+{[couchdb, io_queue2, iowait], [
+ {type, histogram},
+ {desc, <<"Total time request spent waiting on IO">>}
+]}.
+
+{[couchdb, io_queue2, low, count], [
+ {type, counter},
+ {desc, <<"number of requests routed through IO at low priority">>}
+]}.
+{[couchdb, io_queue2, osproc, count], [
+ {type, counter},
+ {desc, <<"number of requests routed through IO os queue">>}
+]}.
+{[couchdb, io_queue2, reads, count], [
+ {type, counter},
+ {desc, <<"number of read requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, writes, count], [
+ {type, counter},
+ {desc, <<"number of write requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, undefined, count], [
+ {type, counter},
+ {desc, <<"number of requests routed through IO queue without I/O class">>}
+]}.
+{[couchdb, io_queue2, unknown, count], [
+ {type, counter},
+ {desc, <<"number of unknown requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, db_update, count], [
+ {type, counter},
+ {desc, <<"DB update requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, db_compact, count], [
+ {type, counter},
+ {desc, <<"DB compaction requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, view_compact, count], [
+ {type, counter},
+ {desc, <<"view compaction requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, view_update, count], [
+ {type, counter},
+ {desc, <<"view indexing requests routed through IO queue">>}
+]}.
+{[couchdb, io_queue2, interactive, count], [
+ {type, counter},
+ {desc, <<"IO directly triggered by client requests">>}
+]}.
+{[couchdb, io_queue2, db_meta, count], [
+ {type, counter},
+ {desc, <<"IO related to db_meta">>}
+]}.
+{[couchdb, io_queue2, internal_repl, count], [
+ {type, counter},
+ {desc, <<"IO related to internal replication">>}
+]}.
+{[couchdb, io_queue2, other, count], [
+ {type, counter},
+ {desc, <<"IO related to internal replication">>}
+]}.
+
+{[couchdb, io_queue2, bypassed_count], [
+ {type, counter},
+ {desc, <<"number of requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue2, reads, bypassed_count], [
+ {type, counter},
+ {desc, <<"number of read requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue2, writes, bypassed_count], [
+ {type, counter},
+ {desc, <<"number of write requests that bypassed IO queue">>}
+]}.
+{[couchdb, io_queue2, unknown, bypassed_count], [
+ {type, counter},
+ {desc, <<"number of unknown requests that bypassed IO queue">>}
+]}.
+
+{[couchdb, io_queue2, reads, queued], [
+ {type, counter},
+ {desc, <<"number of read requests queued into IO queue">>}
+]}.
+{[couchdb, io_queue2, writes, queued], [
+ {type, counter},
+ {desc, <<"number of write requests queued into IO queue">>}
+]}.
+{[couchdb, io_queue2, unknown, queued], [
+ {type, counter},
+ {desc, <<"number of unknown requests queued into IO queue">>}
+]}.
+{[couchdb, io_queue2, queued], [
+ {type, counter},
+ {desc, <<"number of requests queued into IO">>}
+]}.
diff --git a/rebar.config b/rebar.config
new file mode 100644
index 0000000..b25fdb5
--- /dev/null
+++ b/rebar.config
@@ -0,0 +1,11 @@
+{deps, [
+ {proper, ".*", {git, "https://github.com/manopapad/proper.git", "master"}}
+]}.
+
+{eunit_opts, [
+ verbose,
+ {report, {
+ eunit_surefire, [{dir,"."}]
+ }}
+]}.
+
diff --git a/src/ioq.app.src b/src/ioq.app.src
index 65ea50d..04310bb 100644
--- a/src/ioq.app.src
+++ b/src/ioq.app.src
@@ -11,11 +11,14 @@
% the License.
{application,ioq, [
- {description, "I/O prioritizing engine"},
+ {description, "IO request management in a multi-tenant Erlang VM"},
{vsn, git},
{registered,[]},
- {applications,[kernel,stdlib,config]},
+ {applications,[kernel,stdlib,config,couch_stats,hqueue]},
{mod,{ioq_app,[]}},
- {env, []},
- {modules,[ioq,ioq_app,ioq_sup]}
+ {env, [
+ {stats_db, "stats"},
+ {stats_interval, 60000}
+ ]},
+ {modules,[ioq,ioq_app,ioq_osq,ioq_server,ioq_sup]}
]}.
diff --git a/src/ioq.erl b/src/ioq.erl
index 9ca2656..160a448 100644
--- a/src/ioq.erl
+++ b/src/ioq.erl
@@ -11,148 +11,66 @@
% the License.
-module(ioq).
--behaviour(gen_server).
--behaviour(config_listener).
+-export([start/0, stop/0, call/3, call/4, set_disk_concurrency/1,
+ get_disk_queues/0, get_osproc_queues/0, get_osproc_requests/0,
+ get_disk_counters/0, get_disk_concurrency/0]).
+-export([
+ ioq2_enabled/0
+]).
--export([start_link/0, call/3]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
+-define(APPS, [config, folsom, couch_stats, ioq]).
-% config_listener api
--export([handle_config_change/5, handle_config_terminate/3]).
+start() ->
+ lists:foldl(fun(App, _) -> application:start(App) end, ok, ?APPS).
--define(RELISTEN_DELAY, 5000).
+stop() ->
+ lists:foldr(fun(App, _) -> ok = application:stop(App) end, ok, ?APPS).
--record(state, {
- concurrency,
- ratio,
- interactive=queue:new(),
- compaction=queue:new(),
- running=[]
-}).
-
--record(request, {
- fd,
- msg,
- priority,
- from,
- ref
-}).
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+call(Fd, Request, Arg, Priority) ->
+ call(Fd, {Request, Arg}, Priority).
+call(Pid, {prompt, _} = Msg, Priority) ->
+ ioq_osq:call(Pid, Msg, Priority);
+call(Pid, {data, _} = Msg, Priority) ->
+ ioq_osq:call(Pid, Msg, Priority);
call(Fd, Msg, Priority) ->
- Request = #request{fd=Fd, msg=Msg, priority=Priority, from=self()},
- try
- gen_server:call(?MODULE, Request, infinity)
- catch
- exit:{noproc,_} ->
- gen_server:call(Fd, Msg, infinity)
+ case ioq2_enabled() of
+ false -> ioq_server:call(Fd, Msg, Priority);
+ true -> ioq_server2:call(Fd, Msg, Priority)
end.
-init(_) ->
- ok = config:listen_for_changes(?MODULE, nil),
- State = #state{},
- {ok, read_config(State)}.
-
-read_config(State) ->
- Ratio = list_to_float(config:get("ioq", "ratio", "0.01")),
- Concurrency = list_to_integer(config:get("ioq", "concurrency", "10")),
- State#state{concurrency=Concurrency, ratio=Ratio}.
-
-handle_call(#request{}=Request, From, State) ->
- {noreply, enqueue_request(Request#request{from=From}, State), 0}.
-
-handle_cast(change, State) ->
- {noreply, read_config(State)};
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info({Ref, Reply}, State) ->
- case lists:keytake(Ref, #request.ref, State#state.running) of
- {value, Request, Remaining} ->
- erlang:demonitor(Ref, [flush]),
- gen_server:reply(Request#request.from, Reply),
- {noreply, State#state{running=Remaining}, 0};
- false ->
- {noreply, State, 0}
+set_disk_concurrency(C) when is_integer(C), C > 0 ->
+ case ioq2_enabled() of
+ false -> gen_server:call(ioq_server, {set_concurrency, C});
+ true -> ioq_server2:set_concurrency(C)
end;
-handle_info({'DOWN', Ref, _, _, Reason}, State) ->
- case lists:keytake(Ref, #request.ref, State#state.running) of
- {value, Request, Remaining} ->
- gen_server:reply(Request#request.from, {'EXIT', Reason}),
- {noreply, State#state{running=Remaining}, 0};
- false ->
- {noreply, State, 0}
- end;
-handle_info(restart_config_listener, State) ->
- ok = config:listen_for_changes(?MODULE, nil),
- {noreply, State};
-handle_info(timeout, State) ->
- {noreply, maybe_submit_request(State)}.
-
-handle_config_change("ioq", _, _, _, _) ->
- {ok, gen_server:cast(?MODULE, change)};
-handle_config_change(_, _, _, _, _) ->
- {ok, nil}.
+set_disk_concurrency(_) ->
+ erlang:error(badarg).
-handle_config_terminate(_Server, stop, _State) ->
- ok;
-handle_config_terminate(_Server, _Reason, _State) ->
- erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
+get_disk_concurrency() ->
+ case ioq2_enabled() of
+ false -> gen_server:call(ioq_server, get_concurrency);
+ true -> ioq_server2:get_concurrency()
+ end.
-code_change(_Vsn, State, _Extra) ->
- {ok, State}.
+get_disk_queues() ->
+ case ioq2_enabled() of
+ false -> gen_server:call(ioq_server, get_queue_depths);
+ true -> ioq_server2:get_queue_depths()
+ end.
-terminate(_Reason, _State) ->
- ok.
+get_disk_counters() ->
+ case ioq2_enabled() of
+ false -> gen_server:call(ioq_server, get_counters);
+ true -> ioq_server2:get_counters()
+ end.
-enqueue_request(#request{priority={db_compact, _}}=Request, #state{}=State) ->
- State#state{compaction=queue:in(Request, State#state.compaction)};
-enqueue_request(#request{priority={view_compact, _, _}}=Request, #state{}=State) ->
- State#state{compaction=queue:in(Request, State#state.compaction)};
-enqueue_request(#request{}=Request, #state{}=State) ->
- State#state{interactive=queue:in(Request, State#state.interactive)}.
+get_osproc_queues() ->
+ gen_server:call(ioq_osq, get_queue_depths).
-maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State)
- when length(Running) < Concurrency ->
- case make_next_request(State) of
- State ->
- State;
- NewState when length(Running) >= Concurrency - 1 ->
- NewState;
- NewState ->
- maybe_submit_request(NewState)
- end;
-maybe_submit_request(State) ->
- State.
+get_osproc_requests() ->
+ gen_server:call(ioq_osq, get_requests).
-make_next_request(#state{}=State) ->
- case {queue:is_empty(State#state.compaction), queue:is_empty(State#state.interactive)} of
- {true, true} ->
- State;
- {true, false} ->
- choose_next_request(#state.interactive, State);
- {false, true} ->
- choose_next_request(#state.compaction, State);
- {false, false} ->
- case couch_rand:uniform() < State#state.ratio of
- true ->
- choose_next_request(#state.compaction, State);
- false ->
- choose_next_request(#state.interactive, State)
- end
- end.
-
-choose_next_request(Index, State) ->
- case queue:out(element(Index, State)) of
- {empty, _} ->
- State;
- {{value, Request}, Q} ->
- submit_request(Request, setelement(Index, State, Q))
- end.
+ioq2_enabled() ->
+ config:get_boolean("ioq2", "enabled", false).
-submit_request(#request{}=Request, #state{}=State) ->
- Ref = erlang:monitor(process, Request#request.fd),
- Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg},
- State#state{running = [Request#request{ref=Ref} | State#state.running]}.
diff --git a/src/ioq_app.erl b/src/ioq_app.erl
index 2e6d75a..f24dcbb 100644
--- a/src/ioq_app.erl
+++ b/src/ioq_app.erl
@@ -15,6 +15,7 @@
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
+ ok = ioq_kv:init(),
ioq_sup:start_link().
stop(_State) ->
diff --git a/src/ioq_config.erl b/src/ioq_config.erl
new file mode 100644
index 0000000..9cc2371
--- /dev/null
+++ b/src/ioq_config.erl
@@ -0,0 +1,314 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_config).
+
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("ioq/include/ioq.hrl").
+
+
+-export([
+ build_shard_priorities/0,
+ build_shard_priorities/1,
+ build_user_priorities/0,
+ build_user_priorities/1,
+ build_class_priorities/0,
+ build_class_priorities/1,
+ add_default_class_priorities/1,
+ to_float/1,
+ to_float/2,
+ parse_shard_string/1,
+ ioq_classes/0,
+ is_valid_class/1
+]).
+-export([
+ prioritize/4,
+ check_priority/3
+]).
+-export([
+ set_db_config/4,
+ set_shards_config/4,
+ set_shard_config/4,
+ set_class_config/3,
+ set_user_config/3
+]).
+-export([
+ set_bypass/3,
+ set_enabled/2,
+ set_max_priority/2,
+ set_dedupe/2,
+ set_scale_factor/2,
+ set_resize_limit/2,
+ set_concurrency/2,
+ set_dispatch_strategy/2
+]).
+
+
+-define(SHARD_CLASS_SEPARATOR, "||").
+-define(IOQ2_CONFIG, "ioq2").
+-define(IOQ2_BYPASS_CONFIG, "ioq2.bypass").
+-define(IOQ2_SHARDS_CONFIG, "ioq2.shards").
+-define(IOQ2_USERS_CONFIG, "ioq2.users").
+-define(IOQ2_CLASSES_CONFIG, "ioq2.classes").
+
+
+ioq_classes() ->
+ [Class || {Class, _Priority} <- ?DEFAULT_CLASS_PRIORITIES].
+
+
+set_bypass(Class, Value, Reason) when is_atom(Class), is_boolean(Value) ->
+ true = is_valid_class(Class),
+ set_config(?IOQ2_BYPASS_CONFIG, atom_to_list(Class), atom_to_list(Value), Reason).
+
+
+set_enabled(Value, Reason) when is_boolean(Value) ->
+ set_config(?IOQ2_CONFIG, "enabled", atom_to_list(Value), Reason).
+
+
+set_max_priority(Value, Reason) when is_float(Value) ->
+ set_config(?IOQ2_CONFIG, "max_priority", Value, Reason).
+
+
+set_dedupe(Value, Reason) when is_boolean(Value) ->
+ set_config(?IOQ2_CONFIG, "dedupe", atom_to_list(Value), Reason).
+
+
+set_scale_factor(Value, Reason) when is_float(Value) ->
+ set_config(?IOQ2_CONFIG, "scale_factor", float_to_list(Value), Reason).
+
+
+set_resize_limit(Value, Reason) when is_integer(Value) ->
+ set_config(?IOQ2_CONFIG, "resize_limit", integer_to_list(Value), Reason).
+
+
+set_concurrency(Value, Reason) when is_integer(Value) ->
+ set_config(?IOQ2_CONFIG, "concurrency", integer_to_list(Value), Reason).
+
+
+set_dispatch_strategy(Value, Reason) ->
+ ErrorMsg = "Dispatch strategy must be one of "
+ "random, fd_hash, server_per_scheduler, or single_server.",
+ ok = case Value of
+ ?DISPATCH_RANDOM -> ok;
+ ?DISPATCH_FD_HASH -> ok;
+ ?DISPATCH_SINGLE_SERVER -> ok;
+ ?DISPATCH_SERVER_PER_SCHEDULER -> ok;
+ _ -> throw({badarg, ErrorMsg})
+ end,
+ config:set(?IOQ2_CONFIG, "dispatch_strategy", Value, Reason).
+
+
+set_db_config(DbName, Class, Value, Reason) when is_binary(DbName) ->
+ ok = check_float_value(Value),
+ ok = set_shards_config(mem3:shards(DbName), Class, Value, Reason).
+
+
+set_shards_config(Shards, Class, Value, Reason) ->
+ ok = check_float_value(Value),
+ ok = lists:foreach(fun(Shard) ->
+ ok = set_shard_config(Shard, Class, Value, Reason)
+ end, Shards).
+
+
+set_shard_config(#shard{name=Name0}, Class0, Value, Reason) when is_atom(Class0) ->
+ ok = check_float_value(Value),
+ true = is_valid_class(Class0),
+ Name = binary_to_list(filename:rootname(Name0)),
+ Class = atom_to_list(Class0),
+ ConfigName = Name ++ ?SHARD_CLASS_SEPARATOR ++ Class,
+ ok = set_config(?IOQ2_SHARDS_CONFIG, ConfigName, Value, Reason).
+
+
+set_class_config(Class, Value, Reason) when is_atom(Class)->
+ ok = check_float_value(Value),
+ true = is_valid_class(Class),
+ ok = set_config(?IOQ2_CLASSES_CONFIG, atom_to_list(Class), Value, Reason).
+
+
+set_user_config(User, Value, Reason) when is_binary(User) ->
+ set_user_config(binary_to_list(User), Value, Reason);
+set_user_config(User, Value, Reason) ->
+ ok = check_float_value(Value),
+ %% TODO: validate User exists (how to do this without a Req?)
+ ok = set_config(?IOQ2_USERS_CONFIG, User, Value, Reason).
+
+
+is_valid_class(Class) ->
+ lists:member(Class, ioq_classes()).
+
+
+check_float_value(Value) when is_float(Value) ->
+ ok;
+check_float_value(_) ->
+ erlang:error({badarg, invalid_float_value}).
+
+
+set_config(Section, Key, Value, Reason) when is_float(Value) ->
+ set_config(Section, Key, float_to_list(Value), Reason);
+set_config(Section, Key, Value, Reason) when is_binary(Key) ->
+ set_config(Section, binary_to_list(Key), Value, Reason);
+set_config(Section, Key, Value, Reason) ->
+ ok = config:set(Section, Key, Value, Reason).
+
+
+-spec build_shard_priorities() -> {ok, khash:khash()}.
+build_shard_priorities() ->
+ Configs = lists:foldl(
+ fun({Key0, Val}, Acc) ->
+ case parse_shard_string(Key0) of
+ {error, ShardString} ->
+ couch_log:error(
+ "IOQ error parsing shard config: ~p",
+ [ShardString]
+ ),
+ Acc;
+ Key ->
+ [{Key, to_float(Val)} | Acc]
+ end
+ end,
+ [],
+ config:get("ioq2.shards")
+ ),
+ build_shard_priorities(Configs).
+
+
+-spec build_shard_priorities([{any(), float()}]) -> {ok, khash:khash()}.
+build_shard_priorities(Configs) ->
+ init_config_priorities(Configs).
+
+
+-spec build_user_priorities() -> {ok, khash:khash()}.
+build_user_priorities() ->
+ build_user_priorities(config:get("ioq2.users")).
+
+
+-spec build_user_priorities([{any(), float()}]) -> {ok, khash:khash()}.
+build_user_priorities(Configs0) ->
+ Configs = [{list_to_binary(K), to_float(V)} || {K,V} <- Configs0],
+ init_config_priorities(Configs).
+
+
+-spec build_class_priorities() -> {ok, khash:khash()}.
+build_class_priorities() ->
+ build_class_priorities(config:get("ioq2.classes")).
+
+
+-spec build_class_priorities([{any(), float()}]) -> {ok, khash:khash()}.
+build_class_priorities(Configs0) ->
+ {ok, ClassP} = khash:new(),
+ ok = add_default_class_priorities(ClassP),
+ Configs = [{list_to_existing_atom(K), to_float(V)} || {K,V} <- Configs0],
+ init_config_priorities(Configs, ClassP).
+
+
+-spec parse_shard_string(string()) -> {binary(), atom()}
+ | {error, string()}.
+parse_shard_string(ShardString) ->
+ case string:tokens(ShardString, ?SHARD_CLASS_SEPARATOR) of
+ [Shard, Class] ->
+ {list_to_binary(Shard), list_to_existing_atom(Class)};
+ _ ->
+ {error, ShardString}
+ end.
+
+
+-spec add_default_class_priorities(khash:khash()) -> ok.
+add_default_class_priorities(ClassP) ->
+ ok = lists:foreach(
+ fun({Class, Priority}) ->
+ ok = khash:put(ClassP, Class, Priority)
+ end,
+ ?DEFAULT_CLASS_PRIORITIES
+ ).
+
+
+-spec to_float(any()) -> float().
+to_float(V) ->
+ to_float(V, ?DEFAULT_PRIORITY).
+
+
+-spec to_float(any(), float()) -> float().
+to_float(Float, _) when is_float(Float) ->
+ Float;
+to_float(Int, _) when is_integer(Int) ->
+ float(Int);
+to_float(String, Default) when is_list(String) ->
+ try
+ list_to_float(String)
+ catch error:badarg ->
+ try
+ to_float(list_to_integer(String))
+ catch error:badarg ->
+ Default
+ end
+ end;
+to_float(_, Default) ->
+ Default.
+
+
+-spec prioritize(ioq_request(), khash:khash(), khash:khash(), khash:khash()) ->
+ float().
+prioritize(#ioq_request{} = Req, ClassP, UserP, ShardP) ->
+ #ioq_request{
+ user=User,
+ shard=Shard,
+ class=Class
+ } = Req,
+ UP = get_priority(UserP, User),
+ CP = get_priority(ClassP, Class),
+ SP = get_priority(ShardP, {Shard, Class}),
+ UP * CP * SP.
+
+
+-spec init_config_priorities([{any(), float()}]) -> {ok, khash:khash()}.
+init_config_priorities(Configs) ->
+ {ok, Hash} = khash:new(),
+ init_config_priorities(Configs, Hash).
+
+
+-spec init_config_priorities([{any(), float()}], khash:khash()) ->
+ {ok, khash:khash()}.
+init_config_priorities(Configs, Hash) ->
+ ok = lists:foreach(
+ fun({Key, Val}) ->
+ ok = khash:put(Hash, Key, Val)
+ end,
+ Configs
+ ),
+ {ok, Hash}.
+
+
+-spec check_priority(atom(), binary(), binary()) -> float().
+check_priority(Class, User, Shard0) ->
+ {ok, ClassP} = build_class_priorities(),
+ {ok, UserP} = build_user_priorities(),
+ {ok, ShardP} = build_shard_priorities(),
+
+ Shard = filename:rootname(Shard0),
+ Req = #ioq_request{
+ user = User,
+ shard = Shard,
+ class = Class
+ },
+
+ prioritize(Req, ClassP, UserP, ShardP).
+
+
+get_priority(KH, Key) ->
+ get_priority(KH, Key, ?DEFAULT_PRIORITY).
+
+
+get_priority(_KH, undefined, Default) ->
+ Default;
+get_priority(KH, Key, Default) ->
+ khash:get(KH, Key, Default).
diff --git a/src/ioq_config_listener.erl b/src/ioq_config_listener.erl
new file mode 100644
index 0000000..fc10880
--- /dev/null
+++ b/src/ioq_config_listener.erl
@@ -0,0 +1,54 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_config_listener).
+
+-vsn(2).
+-behaviour(config_listener).
+
+-export([
+ subscribe/0
+]).
+
+-export([
+ handle_config_change/5,
+ handle_config_terminate/3
+]).
+
+subscribe() ->
+ config:listen_for_changes(?MODULE, nil).
+
+handle_config_change("ioq", _Key, _Val, _Persist, St) ->
+ ok = notify_ioq_pids(),
+ {ok, St};
+handle_config_change("ioq2", _Key, _Val, _Persist, St) ->
+ ok = notify_ioq_pids(),
+ {ok, St};
+handle_config_change("ioq2."++_Type, _Key, _Val, _Persist, St) ->
+ ok = notify_ioq_pids(),
+ {ok, St};
+handle_config_change(_Sec, _Key, _Val, _Persist, St) ->
+ {ok, St}.
+
+handle_config_terminate(_, stop, _) -> ok;
+handle_config_terminate(_, _, _) ->
+ % We may have missed a change in the last five seconds
+ gen_server:cast(ioq_server, update_config),
+ spawn(fun() ->
+ timer:sleep(5000),
+ config:listen_for_changes(?MODULE, nil)
+ end).
+
+notify_ioq_pids() ->
+ ok = lists:foreach(fun(Pid) ->
+ gen_server:cast(Pid, update_config)
+ end, ioq_sup:get_ioq2_servers()).
diff --git a/src/ioq_kv.erl b/src/ioq_kv.erl
new file mode 100644
index 0000000..cf8b077
--- /dev/null
+++ b/src/ioq_kv.erl
@@ -0,0 +1,169 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% Based on Bob Ippolitto's mochiglobal.erl
+
+%%
+-module(ioq_kv).
+-export([init/0]).
+-export([all/0, get/1, get/2, put/2, delete/1]).
+-export([validate_term/1]).
+
+-define(DYNMOD, ioq_kv_dyn).
+-define(ERLFILE, "ioq_kv_dyn.erl").
+
+-spec init() -> ok.
+%% @doc Initialize the dynamic module
+init() ->
+ compile(all()).
+
+-spec all() -> [{any(), any()}].
+%% @doc Get the list of Key/Val pairs stored
+all() ->
+ try
+ ?DYNMOD:list()
+ catch error:undef ->
+ []
+ end.
+
+-spec get(any()) -> any() | undefined.
+%% @equiv get(Key, undefined)
+get(Key) ->
+ get(Key, undefined).
+
+-spec get(any(), T) -> any() | T.
+%% @doc Get the term for Key or return Default.
+get(Key, Default) ->
+ try
+ ?DYNMOD:lookup(Key, Default)
+ catch error:undef ->
+ Default
+ end.
+
+-spec put(any(), any()) -> ok.
+%% @doc Store term Val at Key, replaces an existing term if present.
+put(Key, Val) ->
+ KVs = proplists:delete(Key, all()),
+ compile([{Key, Val} | KVs]).
+
+-spec delete(any()) -> ok.
+%% @doc Delete term stored at Key, no-op if non-existent.
+delete(Key) ->
+ KVs = proplists:delete(Key, all()),
+ compile(KVs).
+
+
+compile(KVs) ->
+ Bin = compile_mod(KVs),
+ code:purge(?DYNMOD),
+ {module, ?DYNMOD} = code:load_binary(?DYNMOD, ?ERLFILE, Bin),
+ ok.
+
+
+-spec compile_mod([any()]) -> binary().
+compile_mod(KVs) ->
+ Opts = [verbose, report_errors],
+ {ok, ?DYNMOD, Bin} = compile:forms(forms(KVs), Opts),
+ Bin.
+
+
+-spec forms([any()]) -> [erl_syntax:syntaxTree()].
+forms(KVs) ->
+ validate_term(KVs),
+ Statements = [
+ module_stmt(),
+ export_stmt(),
+ list_function(KVs),
+ lookup_function(KVs)
+ ],
+ [erl_syntax:revert(X) || X <- Statements].
+
+
+-spec module_stmt() -> erl_syntax:syntaxTree().
+module_stmt() ->
+ erl_syntax:attribute(
+ erl_syntax:atom(module),
+ [erl_syntax:atom(?DYNMOD)]
+ ).
+
+-spec export_stmt() -> erl_syntax:syntaxTree().
+export_stmt() ->
+ erl_syntax:attribute(
+ erl_syntax:atom(export),
+ [erl_syntax:list([
+ erl_syntax:arity_qualifier(
+ erl_syntax:atom(list),
+ erl_syntax:integer(0)),
+ erl_syntax:arity_qualifier(
+ erl_syntax:atom(lookup),
+ erl_syntax:integer(2))
+ ])]
+ ).
+
+
+-spec list_function([any()]) -> erl_syntax:syntaxTree().
+list_function(KVs) ->
+ erl_syntax:function(
+ erl_syntax:atom(list),
+ [erl_syntax:clause([], none, [erl_syntax:abstract(KVs)])]).
+
+
+-spec lookup_function([any()]) -> erl_syntax:syntaxTree().
+lookup_function(KVs) ->
+ Clauses = lists:foldl(fun({K, V}, ClauseAcc) ->
+ Patterns = [erl_syntax:abstract(K), erl_syntax:underscore()],
+ Bodies = [erl_syntax:abstract(V)],
+ [erl_syntax:clause(Patterns, none, Bodies) | ClauseAcc]
+ end, [default_clause()], KVs),
+ erl_syntax:function(erl_syntax:atom(lookup), Clauses).
+
+
+-spec default_clause() -> erl_syntax:syntaxTree().
+default_clause() ->
+ Patterns = [erl_syntax:underscore(), erl_syntax:variable("Default")],
+ Bodies = [erl_syntax:variable("Default")],
+ erl_syntax:clause(Patterns, none, Bodies).
+
+
+-spec validate_term(any()) -> ok.
+%% @doc Validate that a term is supported. Throws invalid_term
+%% on error.
+validate_term(T) when is_list(T) ->
+ validate_list(T);
+validate_term(T) when is_tuple(T) ->
+ validate_tuple(T);
+validate_term(T) when is_bitstring(T) ->
+ case bit_size(T) rem 8 of
+ 0 -> ok;
+ _ -> erlang:error(invalid_term)
+ end;
+validate_term(_T) ->
+ ok.
+
+-spec validate_list(list()) -> ok.
+validate_list([]) ->
+ ok;
+validate_list([H|T]) ->
+ validate_term(H),
+ validate_list(T).
+
+-spec validate_tuple(tuple()) -> ok.
+validate_tuple(T) ->
+ validate_tuple(T, 1, size(T)).
+
+-spec validate_tuple(tuple(), pos_integer(), pos_integer()) -> ok.
+validate_tuple(T, Pos, Size) when Pos =< Size ->
+ validate_term(element(Pos, T)),
+ validate_tuple(T, Pos+1, Size);
+validate_tuple(_, _, _) ->
+ ok.
+
diff --git a/src/ioq_osq.erl b/src/ioq_osq.erl
new file mode 100644
index 0000000..2dbfaea
--- /dev/null
+++ b/src/ioq_osq.erl
@@ -0,0 +1,253 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_osq).
+-behaviour(gen_server).
+-vsn(1).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, call/3]).
+
+-record(channel, {
+ name,
+ q = queue:new()
+}).
+
+-record(state, {
+ reqs = [],
+ min = 2,
+ max = 6,
+ global_max = 15,
+ channels = queue:new()
+}).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% WARNING %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% This server relies on the internal structure of the channels queue as a %%
+%% {list(), list()} to do in-place modifications of some elements. We are %%
+%% "running on thin ice", as it were. %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+call(Pid, Msg, Priority) ->
+ Reply = gen_server:call(ioq_osq, {rlimit, nil, Priority, now()}, infinity),
+ try
+ gen_server:call(Pid, Msg, infinity)
+ after
+ whereis(ioq_osq) ! Reply
+ end.
+
+
+init([]) ->
+ ets:new(osq_counters, [named_table]),
+ St = #state{},
+ {ok, St#state{
+ min = threshold("minimum", St#state.min),
+ max = threshold("maximum", St#state.max),
+ global_max = threshold("global_maximum", St#state.global_max)
+ }}.
+
+handle_call({set_minimum, C}, _From, State) when is_integer(C), C > 0 ->
+ {reply, ok, State#state{min = C}};
+
+handle_call({set_maximum, C}, _From, State) when is_integer(C), C > 0 ->
+ {reply, ok, State#state{max = C}};
+
+handle_call({set_global_maximum, C}, _From, State) when is_integer(C), C > 0 ->
+ {reply, ok, State#state{global_max = C}};
+
+handle_call(get_queue_depths, _From, State) ->
+ Channels = [{N, queue:len(Q)} || #channel{name=N, q=Q}
+ <- queue:to_list(State#state.channels)],
+ {reply, Channels, State};
+
+handle_call(get_requests, _From, State) ->
+ {reply, State#state.reqs, State};
+
+handle_call({_, _, {interactive, Shard}, _} = Req, From, State) ->
+ {noreply, enqueue_channel(channel_name(Shard), {Req, From}, State)};
+
+handle_call({_, _, {view_update, Shard, _}, _} = Req, From, State) ->
+ {noreply, enqueue_channel(channel_name(Shard), {Req, From}, State)};
+
+handle_call({Fd, _, _, _} = Req, From, State) when is_pid(Fd) ->
+ {noreply, enqueue_channel(other, {Req, From}, State)};
+
+handle_call({rlimit, _, _, _} = Req, From, State) ->
+ {noreply, enqueue_channel(other, {Req, From}, State)};
+
+handle_call(_Msg, _From, State) ->
+ {reply, ignored, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({Ref, Reply}, #state{reqs = Reqs} = State) ->
+ case lists:keyfind(Ref, 3, Reqs) of
+ {_, notify, Ref} ->
+ erlang:demonitor(Ref, [flush]),
+ Reqs2 = lists:keydelete(Ref, 3, Reqs),
+ {noreply, make_next_request(State#state{reqs = Reqs2})};
+ {_, From, Ref} ->
+ erlang:demonitor(Ref, [flush]),
+ gen_server:reply(From, Reply),
+ Reqs2 = lists:keydelete(Ref, 3, Reqs),
+ {noreply, make_next_request(State#state{reqs = Reqs2})};
+ false ->
+ {noreply, State}
+ end;
+
+handle_info({'DOWN', Ref, _, _, Reason}, #state{reqs = Reqs} = State) ->
+ case lists:keyfind(Ref, 3, Reqs) of
+ {_, notify, Ref} ->
+ Reqs2 = lists:keydelete(Ref, 3, Reqs),
+ {noreply, make_next_request(State#state{reqs = Reqs2})};
+ {_, From, Ref} ->
+ gen_server:reply(From, {'EXIT', Reason}),
+ Reqs2 = lists:keydelete(Ref, 3, Reqs),
+ {noreply, make_next_request(State#state{reqs = Reqs2})};
+ false ->
+ {noreply, State}
+ end;
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+channel_name(Shard) ->
+ try re:split(Shard, "/") of
+ [<<"shards">>, _, <<"heroku">>, AppId | _] ->
+ <<AppId/binary, ".heroku">>;
+ [<<"shards">>, _, Account | _] ->
+ Account;
+ _ ->
+ other
+ catch _:_ ->
+ other
+ end.
+
+find_channel(Account, {A, B}) ->
+ case lists:keyfind(Account, 2, A) of
+ false ->
+ case lists:keyfind(Account, 2, B) of
+ false ->
+ {new, #channel{name = Account}};
+ #channel{} = Channel ->
+ {2, Channel}
+ end;
+ #channel{} = Channel ->
+ {1, Channel}
+ end.
+
+update_channel(#channel{q = Q} = Ch, Req) ->
+ Ch#channel{q = queue:in(Req, Q)}.
+
+enqueue_channel(Account, Req, #state{channels = Q} = State) ->
+ NewState = case find_channel(Account, Q) of
+ {new, Channel0} ->
+ State#state{channels = queue:in(update_channel(Channel0, Req), Q)};
+ {Elem, Channel0} ->
+ Channel = update_channel(Channel0, Req),
+ % the channel already exists in the queue - update it in place
+ L = element(Elem, Q),
+ NewQ = setelement(Elem, Q, lists:keyreplace(Account, 2, L, Channel)),
+ State#state{channels = NewQ}
+ end,
+ maybe_submit_request(NewState).
+
+maybe_submit_request(#state{global_max=C, reqs=R} = St) when length(R) < C ->
+ make_next_request(St);
+maybe_submit_request(#state{min = Min} = State) ->
+ % look for a channel which hasn't reached the minimum yet
+ make_next_request(State, Min).
+
+make_next_request(#state{max = Max} = State) ->
+ % default behavior, look for a channel not yet maxed out
+ make_next_request(State, Max).
+
+make_next_request(#state{channels = Channels, reqs = R} = State, Threshold) ->
+ case next_unblocked_channel(Channels, R, Threshold, queue:new()) of
+ {#channel{name = Name, q = Q} = Ch, OutChannels} ->
+ {{value, Item}, NewQ} = queue:out(Q),
+ case queue:is_empty(NewQ) of
+ true ->
+ NewCh = OutChannels;
+ false ->
+ NewCh = queue:in(Ch#channel{q = NewQ}, OutChannels)
+ end,
+ submit_request(Name, Item, State#state{channels = NewCh});
+ {nil, OutQ} ->
+ % everybody is using their allotted slots, try again later
+ State#state{channels = OutQ}
+ end.
+
+next_unblocked_channel(InQ, Reqs, Max, OutQ) ->
+ case queue:out(InQ) of
+ {empty, _} -> % all channels blocked
+ {nil, OutQ};
+ {{value, #channel{name=Name} = Channel}, NewQ} ->
+ case length([1 || {N, _, _} <- Reqs, N =:= Name]) >= Max of
+ true -> % channel is blocked, keep searching
+ next_unblocked_channel(NewQ, Reqs, Max, queue:in(Channel, OutQ));
+ false ->
+ {Channel, queue:join(NewQ, OutQ)}
+ end
+ end.
+
+
+submit_request(Channel, {{rlimit,_,Pri,T0}, From}, #state{reqs=Reqs} = State) ->
+ % rlimit fd means that we'll get a response back
+ % from the pid after it performs the call on its
+ % own
+ Ref = erlang:monitor(process, element(1, From)),
+ gen_server:reply(From, {Ref, nil}),
+ record_stats(Channel, Pri, T0),
+ State#state{reqs = [{Channel, notify, Ref} | Reqs]};
+
+submit_request(Channel, {{Fd,Call,Pri,T0}, From}, #state{reqs=Reqs} = State) ->
+ % make the request
+ Ref = erlang:monitor(process, Fd),
+ Fd ! {'$gen_call', {self(), Ref}, Call},
+ record_stats(Channel, Pri, T0),
+ State#state{reqs = [{Channel, From, Ref} | Reqs]}.
+
+record_stats(Channel, Pri, T0) ->
+ IOClass = if is_tuple(Pri) -> element(1, Pri); true -> Pri end,
+ Latency = timer:now_diff(now(),T0) / 1000,
+ catch couch_stats:increment_counter([couchdb, io_queue, IOClass]),
+ catch couch_stats:increment_counter([couchdb, io_queue, osproc]),
+ catch couch_stats:update_histogram([couchdb, io_queue, latency], Latency),
+ update_counter(Channel, IOClass, osproc).
+
+update_counter(Channel, IOClass, RW) ->
+ try ets:update_counter(osq_counters, {Channel, IOClass, RW}, 1)
+ catch error:badarg ->
+ ets:insert(osq_counters, {{Channel, IOClass, RW}, 1})
+ end.
+
+threshold(Name, Default) ->
+ try list_to_integer(config:get("osq", Name)) of
+ C when C > 0->
+ C;
+ _ ->
+ Default
+ catch _:_ ->
+ Default
+ end.
diff --git a/src/ioq_server.erl b/src/ioq_server.erl
new file mode 100644
index 0000000..8e0891d
--- /dev/null
+++ b/src/ioq_server.erl
@@ -0,0 +1,608 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_server).
+-behaviour(gen_server).
+-vsn(1).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, call/3]).
+-export([add_channel/3, rem_channel/2, list_custom_channels/0]).
+
+-record(channel, {
+ name,
+ qI = queue:new(), % client readers
+ qU = queue:new(), % db updater
+ qV = queue:new() % view index updates
+}).
+
+-record(state, {
+ counters,
+ histos,
+ reqs = [],
+ concurrency = 20,
+ channels = queue:new(),
+ qC = queue:new(), % compaction
+ qR = queue:new(), % internal replication
+ qL = queue:new(),
+ dedupe,
+ class_priorities,
+ op_priorities
+}).
+
+-record(request, {
+ fd,
+ msg,
+ class,
+ channel, % the name of the channel, not the actual data structure
+ from,
+ ref,
+ t0,
+ tsub
+}).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% WARNING %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% This server relies on the internal structure of the channels queue as a %%
+%% {list(), list()} to do in-place modifications of some elements. We are %%
+%% "running on thin ice", as it were. %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+call(Fd, Msg, Priority) ->
+ {Class, Channel} = analyze_priority(Priority),
+ Request = #request{
+ fd = Fd,
+ msg = Msg,
+ channel = Channel,
+ class = Class,
+ t0 = now()
+ },
+ case config:get("ioq.bypass", atom_to_list(Class)) of
+ "true" ->
+ RW = rw(Msg),
+ catch couch_stats:increment_counter([couchdb, io_queue_bypassed, Class]),
+ catch couch_stats:increment_counter([couchdb, io_queue_bypassed, RW]),
+ gen_server:call(Fd, Msg, infinity);
+ _ ->
+ gen_server:call(?MODULE, Request, infinity)
+ end.
+
+add_channel(Account, DbName, ChannelName) ->
+ ok = ioq_kv:put({Account, DbName}, ChannelName).
+
+rem_channel(Account, DbName) ->
+ ok = ioq_kv:delete({Account, DbName}).
+
+list_custom_channels() ->
+ ioq_kv:all().
+
+init([]) ->
+ State = #state{
+ counters = ets:new(ioq_counters, []),
+ histos = ets:new(ioq_histos, [named_table, ordered_set])
+ },
+ erlang:send_after(get_interval(), self(), dump_table),
+ {ok, update_config(State)}.
+
+handle_call({set_priority, Pri}, _From, State) ->
+ {reply, process_flag(priority, Pri), State, 0};
+
+handle_call({set_concurrency, C}, _From, State) when is_integer(C), C > 0 ->
+ {reply, State#state.concurrency, State#state{concurrency = C}, 0};
+
+handle_call(get_concurrency, _From, State) ->
+ {reply, State#state.concurrency, State, 0};
+
+handle_call(get_counters, _From, #state{counters = Tab} = State) ->
+ {reply, Tab, State, 0};
+
+handle_call(get_queue_depths, _From, State) ->
+ Channels = lists:map(fun(#channel{name=N, qI=I, qU=U, qV=V}) ->
+ {N, [queue:len(I), queue:len(U), queue:len(V)]}
+ end, queue:to_list(State#state.channels)),
+ Response = [
+ {compaction, queue:len(State#state.qC)},
+ {replication, queue:len(State#state.qR)},
+ {low, queue:len(State#state.qL)},
+ {channels, {Channels}}
+ ],
+ {reply, Response, State, 0};
+
+handle_call(reset_histos, _From, State) ->
+ ets:delete_all_objects(State#state.histos),
+ {reply, ok, State, 0};
+
+handle_call(#request{} = Req, From, State) ->
+ {noreply, enqueue_request(Req#request{from = From}, State), 0};
+
+%% backwards-compatible mode for messages sent during hot upgrade
+handle_call({Fd, Msg, Priority, T0}, From, State) ->
+ {Class, Chan} = analyze_priority(Priority),
+ R = #request{fd=Fd, msg=Msg, channel=Chan, class=Class, t0=T0, from=From},
+ {noreply, enqueue_request(R, State), 0};
+
+handle_call(_Msg, _From, State) ->
+ {reply, ignored, State, 0}.
+
+handle_cast(update_config, State) ->
+ {noreply, update_config(State), 0};
+
+handle_cast(_Msg, State) ->
+ {noreply, State, 0}.
+
+handle_info({Ref, Reply}, #state{reqs = Reqs} = State) ->
+ case lists:keytake(Ref, #request.ref, Reqs) of
+ {value, #request{from=From} = Req, Reqs2} ->
+ TResponse = erlang:now(),
+ erlang:demonitor(Ref, [flush]),
+ reply_to_all(From, Reply),
+ update_histograms(ioq_histos, Req, TResponse),
+ {noreply, State#state{reqs = Reqs2}, 0};
+ false ->
+ {noreply, State, 0}
+ end;
+
+handle_info({'DOWN', Ref, _, _, Reason}, #state{reqs = Reqs} = State) ->
+ case lists:keytake(Ref, #request.ref, Reqs) of
+ {value, #request{from=From}, Reqs2} ->
+ reply_to_all(From, {'EXIT', Reason}),
+ {noreply, State#state{reqs = Reqs2}, 0};
+ false ->
+ {noreply, State, 0}
+ end;
+
+handle_info(dump_table, State) ->
+ erlang:send_after(get_interval(), self(), dump_table),
+ {noreply, dump_table(State), 0};
+
+handle_info(timeout, State) ->
+ {noreply, maybe_submit_request(State)};
+
+handle_info(_Info, State) ->
+ {noreply, State, 0}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, #state{}=State, _Extra) ->
+ {ok, State}.
+
+update_config(State) ->
+ Concurrency = try
+ list_to_integer(config:get("ioq", "concurrency", "20"))
+ catch _:_ ->
+ 20
+ end,
+
+ DeDupe = config:get("ioq", "dedupe", "true") == "true",
+
+ P1 = to_float(catch config:get("ioq", "customer", "1.0")),
+ P2 = to_float(catch config:get("ioq", "replication", "0.001")),
+ P3 = to_float(catch config:get("ioq", "compaction", "0.0001")),
+ P4 = to_float(catch config:get("ioq", "low", "0.0001")),
+
+ P5 = to_float(catch config:get("ioq", "reads", "1.0")),
+ P6 = to_float(catch config:get("ioq", "writes", "1.0")),
+ P7 = to_float(catch config:get("ioq", "views", "1.0")),
+
+ State#state{
+ concurrency = Concurrency,
+ dedupe = DeDupe,
+ class_priorities = [P1, P2, P3, P4],
+ op_priorities = [P5, P6, P7]
+ }.
+
+reply_to_all([], _Reply) ->
+ ok;
+reply_to_all([From|Rest], Reply) ->
+ gen_server:reply(From, Reply),
+ reply_to_all(Rest, Reply);
+reply_to_all(From, Reply) ->
+ gen_server:reply(From, Reply).
+
+analyze_priority({interactive, Shard}) ->
+ {interactive, channel_name(Shard)};
+analyze_priority({db_update, Shard}) ->
+ {db_update, channel_name(Shard)};
+analyze_priority({view_update, Shard, _GroupId}) ->
+ {view_update, channel_name(Shard)};
+analyze_priority({db_compact, _Shard}) ->
+ {db_compact, nil};
+analyze_priority({view_compact, _Shard, _GroupId}) ->
+ {view_compact, nil};
+analyze_priority({internal_repl, _Shard}) ->
+ {internal_repl, nil};
+analyze_priority({low, _Shard}) ->
+ {low, nil};
+analyze_priority(_Else) ->
+ {other, other}.
+
+channel_name(Shard) ->
+ try split(Shard) of
+ [<<"shards">>, _, <<"heroku">>, AppId | _] ->
+ <<AppId/binary, ".heroku">>;
+ [<<"shards">>, _, DbName] ->
+ ioq_kv:get({other, DbName}, other);
+ [<<"shards">>, _, Account, DbName] ->
+ ioq_kv:get({Account, DbName}, Account);
+ [<<"shards">>, _, Account | DbParts] ->
+ ioq_kv:get({Account, filename:join(DbParts)}, Account);
+ _ ->
+ other
+ catch _:_ ->
+ other
+ end.
+
+enqueue_request(#request{class = db_compact} = Req, State) ->
+ State#state{qC = update_queue(Req, State#state.qC, State#state.dedupe)};
+enqueue_request(#request{class = view_compact} = Req, State) ->
+ State#state{qC = update_queue(Req, State#state.qC, State#state.dedupe)};
+enqueue_request(#request{class = internal_repl} = Req, State) ->
+ State#state{qR = update_queue(Req, State#state.qR, State#state.dedupe)};
+enqueue_request(#request{class = low} = Req, State) ->
+ State#state{qL = update_queue(Req, State#state.qL, State#state.dedupe)};
+enqueue_request(Req, State) ->
+ enqueue_channel(Req, State).
+
+find_channel(Account, {A, B}) ->
+ case lists:keyfind(Account, #channel.name, A) of
+ false ->
+ case lists:keyfind(Account, #channel.name, B) of
+ false ->
+ {new, #channel{name = Account}};
+ #channel{} = Channel ->
+ {2, Channel}
+ end;
+ #channel{} = Channel ->
+ {1, Channel}
+ end.
+
+update_channel(Ch, #request{class = view_update} = Req, Dedupe) ->
+ Ch#channel{qV = update_queue(Req, Ch#channel.qV, Dedupe)};
+update_channel(Ch, #request{class = db_update} = Req, Dedupe) ->
+ Ch#channel{qU = update_queue(Req, Ch#channel.qU, Dedupe)};
+update_channel(Ch, Req, Dedupe) ->
+ % everything else is interactive IO class
+ Ch#channel{qI = update_queue(Req, Ch#channel.qI, Dedupe)}.
+
+update_queue(#request{from=From, fd=Fd, msg={pread_iolist, Pos}}=Req, Q, DD) ->
+ case maybe_dedupe(Fd, Pos, Q, DD) of
+ false ->
+ queue:in(Req, Q);
+ {Elem, N, #request{from=From1}=Match} ->
+ catch couch_stats:increment_counter([couchdb, io_queue, merged]),
+ Match1 = Match#request{from=append(From, From1)},
+ L = element(Elem, Q),
+ {H, [Match|T]} = lists:split(N, L),
+ setelement(Elem, Q, H ++ [Match1|T])
+ end;
+update_queue(Req, Q, _Dedupe) ->
+ queue:in(Req, Q).
+
+append(A, B) when is_list(B) ->
+ [A|B];
+append(A, B) ->
+ [A, B].
+
+maybe_dedupe(Fd, Pos, Q, Dedupe) ->
+ case Dedupe of
+ true -> matching_request(Fd, Pos, Q);
+ false -> false
+ end.
+
+matching_request(Fd, Pos, {A, B}) ->
+ case matching_request(Fd, Pos, A) of
+ false ->
+ case matching_request(Fd, Pos, B) of
+ false ->
+ false;
+ {N, Request} ->
+ {2, N, Request}
+ end;
+ {N, Request} ->
+ {1, N, Request}
+ end;
+matching_request(Fd, Pos, List) ->
+ matching_request(Fd, Pos, 0, List).
+
+matching_request(_Fd, _Pos, _N, []) ->
+ false;
+matching_request(Fd, Pos, N, [#request{fd=Fd, msg={pread_iolist, Pos}}=Req|_]) ->
+ {N, Req};
+matching_request(Fd, Pos, N, [_|Rest]) ->
+ matching_request(Fd, Pos, N + 1, Rest).
+
+enqueue_channel(#request{channel=Account} = Req, #state{channels=Q} = State) ->
+ DD = State#state.dedupe,
+ case find_channel(Account, Q) of
+ {new, Channel0} ->
+ State#state{channels = queue:in(update_channel(Channel0, Req, DD), Q)};
+ {Elem, Channel0} ->
+ Channel = update_channel(Channel0, Req, DD),
+ % the channel already exists in the queue - update it in place
+ L = element(Elem, Q),
+ NewL = lists:keyreplace(Account, #channel.name, L, Channel),
+ NewQ = setelement(Elem, Q, NewL),
+ State#state{channels = NewQ}
+ end.
+
+maybe_submit_request(#state{concurrency=C,reqs=R} = St) when length(R) < C ->
+ case make_next_request(St) of
+ St ->
+ St;
+ NewState when length(R) >= C-1 ->
+ NewState;
+ NewState ->
+ maybe_submit_request(NewState)
+ end;
+maybe_submit_request(State) ->
+ State.
+
+sort_queues(QueuesAndPriorities, Normalization, Choice) ->
+ sort_queues(QueuesAndPriorities, Normalization, Choice, 0, [], []).
+
+sort_queues([{Q, _Pri}], _Norm, _Choice, _X, [], Acc) ->
+ lists:reverse([Q | Acc]);
+sort_queues([{Q, Pri}], Norm, Choice, _X, Skipped, Acc) ->
+ sort_queues(lists:reverse(Skipped), Norm - Pri, Choice, 0, [], [Q | Acc]);
+sort_queues([{Q, Pri} | Rest], Norm, Choice, X, Skipped, Acc) ->
+ if Choice < ((X + Pri) / Norm) ->
+ Remaining = lists:reverse(Skipped, Rest),
+ sort_queues(Remaining, Norm - Pri, Choice, 0, [], [Q | Acc]);
+ true ->
+ sort_queues(Rest, Norm, Choice, X + Pri, [{Q, Pri} | Skipped], Acc)
+ end.
+
+make_next_request(State) ->
+ #state{
+ channels = Ch,
+ qC = QC,
+ qR = QR,
+ qL = QL,
+ class_priorities = ClassP,
+ op_priorities = OpP
+ } = State,
+
+
+ {Item, [NewCh, NewQR, NewQC, NewQL]} =
+ choose_next_request([Ch, QR, QC, QL], ClassP),
+
+ case Item of nil ->
+ State;
+ #channel{qI = QI, qU = QU, qV = QV} = Channel ->
+ % An IO channel has at least one interactive or view indexing request.
+ % If the channel has more than one request, we'll toss it back into the
+ % queue after we've extracted one here
+ {Item2, [QI2, QU2, QV2]} =
+ choose_next_request([QI, QU, QV], OpP),
+ case queue:is_empty(QU2) andalso
+ queue:is_empty(QI2) andalso
+ queue:is_empty(QV2) of
+ true ->
+ NewCh2 = NewCh;
+ false ->
+ NewCh2 = queue:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh)
+ end,
+ submit_request(Item2, State#state{channels=NewCh2, qC=NewQC,
+ qR=NewQR, qL=NewQL});
+ _ ->
+ % Item is a background (compaction or internal replication) task
+ submit_request(Item, State#state{channels=NewCh, qC=NewQC, qR=NewQR,
+ qL=NewQL})
+ end.
+
+submit_request(Request, State) ->
+ #request{
+ channel = Channel,
+ fd = Fd,
+ msg = Call,
+ t0 = T0,
+ class = IOClass
+ } = Request,
+ #state{reqs = Reqs, counters = Counters} = State,
+
+ % make the request
+ Ref = erlang:monitor(process, Fd),
+ Fd ! {'$gen_call', {self(), Ref}, Call},
+
+ % record some stats
+ RW = rw(Call),
+ SubmitTime = now(),
+ Latency = timer:now_diff(SubmitTime, T0) / 1000,
+ catch couch_stats:increment_counter([couchdb, io_queue, IOClass]),
+ catch couch_stats:increment_counter([couchdb, io_queue, RW]),
+ catch couch_stats:update_histogram([couchdb, io_queue, latency], Latency),
+ update_counter(Counters, Channel, IOClass, RW),
+ State#state{reqs = [Request#request{tsub=SubmitTime, ref=Ref} | Reqs]}.
+
+update_counter(Tab, Channel, IOClass, RW) ->
+ upsert(Tab, {Channel, IOClass, RW}, 1).
+
+update_histograms(Tab, Req, TResponse) ->
+ #request{t0=T0, tsub=TSubmit, class=Class, channel=Channel, msg=Msg} = Req,
+ Delta1 = timer:now_diff(TSubmit, T0),
+ Delta2 = timer:now_diff(TResponse, TSubmit),
+ Bin1 = timebin(Delta1),
+ Bin2 = timebin(Delta2),
+ Bin3 = timebin(Delta1+Delta2),
+ if Channel =/= nil ->
+ upsert(Tab, {Channel, submit_delay, Bin1}, 1),
+ upsert(Tab, {Channel, svctm, Bin2}, 1),
+ upsert(Tab, {Channel, iowait, Bin3}, 1);
+ true -> ok end,
+ Key = make_key(Class, Msg),
+ upsert(Tab, {Key, submit_delay, Bin1}, 1),
+ upsert(Tab, {Key, svctm, Bin2}, 1),
+ upsert(Tab, {Key, iowait, Bin3}, 1).
+
+make_key(db_compact, _) ->
+ <<"compaction">>;
+make_key(view_compact, _) ->
+ <<"compaction">>;
+make_key(internal_repl, _) ->
+ <<"replication">>;
+make_key(low, _) ->
+ <<"low">>;
+make_key(view_update, _) ->
+ <<"views">>;
+make_key(db_update, _) ->
+ <<"writes">>;
+make_key(interactive, {pread_iolist, _}) ->
+ <<"reads">>;
+make_key(interactive, {append_bin, _}) ->
+ <<"writes">>;
+make_key(_, _) ->
+ <<"other">>.
+
+upsert(Tab, Key, Incr) ->
+ try ets:update_counter(Tab, Key, Incr)
+ catch error:badarg ->
+ ets:insert(Tab, {Key, Incr})
+ end.
+
+timebin(V) ->
+ trunc(10*math:log10(V)).
+
+choose_next_request(Qs, Priorities) ->
+ Norm = lists:sum(Priorities),
+ QueuesAndPriorities = lists:zip(Qs, Priorities),
+ SortedQueues = sort_queues(QueuesAndPriorities, Norm, random:uniform()),
+ {Item, NewQueues} = choose_prioritized_request(SortedQueues),
+ Map0 = lists:zip(SortedQueues, NewQueues),
+ {Item, [element(2, lists:keyfind(Q, 1, Map0)) || Q <- Qs]}.
+
+choose_prioritized_request(Qs) ->
+ choose_prioritized_request(Qs, []).
+
+choose_prioritized_request([], Empties) ->
+ {nil, lists:reverse(Empties)};
+choose_prioritized_request([Q | Rest], Empties) ->
+ case queue:out(Q) of
+ {empty, _} ->
+ choose_prioritized_request(Rest, [Q | Empties]);
+ {{value, Item}, NewQ} ->
+ {Item, lists:reverse([NewQ | Empties], Rest)}
+ end.
+
+to_float("0") ->
+ 0.00001;
+to_float("1") ->
+ 1.0;
+to_float(String) when is_list(String) ->
+ try list_to_float(String) catch error:badarg -> 0.5 end;
+to_float(_) ->
+ 0.5.
+
+dump_table(#state{counters = Tab} = State) ->
+ Pid = spawn(fun save_to_db/0),
+ ets:give_away(Tab, Pid, nil),
+ State#state{counters = ets:new(ioq_counters, [])}.
+
+save_to_db() ->
+ Timeout = get_interval(),
+ receive {'ETS-TRANSFER', Tab, _, _} ->
+ Dict = ets:foldl(fun table_fold/2, dict:new(), Tab),
+ TS = list_to_binary(iso8601_timestamp()),
+ Doc = {[
+ {<<"_id">>, TS},
+ {type, ioq},
+ {node, node()},
+ {accounts, {dict:to_list(Dict)}}
+ ]},
+ try
+ fabric:update_doc(get_stats_dbname(), Doc, [])
+ catch error:database_does_not_exist ->
+ couch_log:error("Missing IOQ stats db: ~s", [get_stats_dbname()])
+ end
+ after Timeout ->
+ error_logger:error_report({?MODULE, "ets transfer failed"})
+ end.
+
+table_fold({{other, _, _}, _}, D) ->
+ D;
+table_fold({{Channel, interactive, reads}, X}, D) ->
+ dict:update(Channel, fun([A,B,C]) -> [A+X,B,C] end, [X,0,0], D);
+table_fold({{Channel, interactive, writes}, X}, D) ->
+ dict:update(Channel, fun([A,B,C]) -> [A,B+X,C] end, [0,X,0], D);
+table_fold({{Channel, db_update, reads}, X}, D) ->
+ dict:update(Channel, fun([A,B,C]) -> [A,B+X,C] end, [0,X,0], D);
+table_fold({{Channel, db_update, writes}, X}, D) ->
+ dict:update(Channel, fun([A,B,C]) -> [A,B+X,C] end, [0,X,0], D);
+table_fold({{Channel, view_update, reads}, X}, D) ->
+ dict:update(Channel, fun([A,B,C]) -> [A,B,C+X] end, [0,0,X], D);
+table_fold({{Channel, view_update, writes}, X}, D) ->
+ dict:update(Channel, fun([A,B,C]) -> [A,B,C+X] end, [0,0,X], D);
+table_fold(_, D) ->
+ D.
+
+iso8601_timestamp() ->
+ {_,_,Micro} = Now = os:timestamp(),
+ {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
+ Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
+ io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
+
+get_interval() ->
+ case application:get_env(ioq, stats_interval) of
+ {ok, Interval} when is_integer(Interval) ->
+ Interval;
+ _ ->
+ 60000
+ end.
+
+get_stats_dbname() ->
+ case application:get_env(ioq, stats_db) of
+ {ok, DbName} when is_list(DbName) ->
+ DbName;
+ _ ->
+ "stats"
+ end.
+
+split(B) when is_binary(B) ->
+ split(B, 0, 0, []);
+split(B) -> B.
+
+split(B, O, S, Acc) ->
+ case B of
+ <<_:O/binary>> ->
+ Len = O - S,
+ <<_:S/binary, Part:Len/binary>> = B,
+ lists:reverse(Acc, [Part]);
+ <<_:O/binary, $/, _/binary>> ->
+ Len = O - S,
+ <<_:S/binary, Part:Len/binary, _/binary>> = B,
+ split(B, O+1, O+1, [Part | Acc]);
+ _ ->
+ split(B, O+1, S, Acc)
+ end.
+
+rw({pread_iolist, _}) ->
+ reads;
+rw({append_bin, _}) ->
+ writes;
+rw(_) ->
+ unknown.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+sort_queues_test() ->
+ ?assertEqual([a, b, c], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.10)),
+ ?assertEqual([a, c, b], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.45)),
+ ?assertEqual([b, a, c], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.60)),
+ ?assertEqual([b, c, a], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.65)),
+ ?assertEqual([c, a, b], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.71)),
+ ?assertEqual([c, b, a], sort_queues([{a,0.5}, {b,0.2}, {c,0.3}], 1, 0.90)).
+
+-endif.
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
new file mode 100644
index 0000000..5e2e01f
--- /dev/null
+++ b/src/ioq_server2.erl
@@ -0,0 +1,1068 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_server2).
+-behavior(gen_server).
+
+
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+]).
+-export([
+ start_link/3,
+ call/3,
+ pcall/1,
+ pcall/2
+]).
+-export([
+ get_state/0,
+ get_state/1,
+ update_config/0,
+ get_queue_depths/0,
+ get_queue_depths/1,
+ get_concurrency/0,
+ set_concurrency/1,
+ get_counters/0
+]).
+
+
+-include_lib("ioq/include/ioq.hrl").
+
+
+-define(DEFAULT_RESIZE_LIMIT, 1000).
+-define(DEFAULT_CONCURRENCY, 1).
+-define(DEFAULT_SCALE_FACTOR, 2.0).
+-define(DEFAULT_MAX_PRIORITY, 10000.0).
+
+
+-record(state, {
+ reqs :: khash:khash(),
+ waiters :: khash:khash(),
+ queue :: hqueue:hqueue(),
+ concurrency = ?DEFAULT_CONCURRENCY :: pos_integer(),
+ iterations = 0 :: non_neg_integer(),
+ class_p :: khash:khash(), %% class priorities
+ user_p :: khash:khash(), %% user priorities
+ shard_p :: khash:khash(), %% shard priorities
+ scale_factor = ?DEFAULT_SCALE_FACTOR :: float(),
+ dedupe = true :: boolean(),
+ resize_limit = ?DEFAULT_RESIZE_LIMIT :: pos_integer(),
+ next_key = 1 :: pos_integer(),
+ server_name :: atom(),
+ scheduler_id = 0 :: non_neg_integer(),
+ max_priority = ?DEFAULT_MAX_PRIORITY :: float()
+}).
+
+
+-type state() :: #state{}.
+-type waiter_key() :: {pid(), integer()} | pos_integer().
+-type priority() :: float(). %% should be non_negative_float().
+
+%% Hacky queue_depth type due to existing fixed element lists for JSON in API
+%% Actual type is:
+%% [
+%% {compaction, non_neg_integer()},
+%% {replication, non_neg_integer()},
+%% {low, non_neg_integer()},
+%% {channels, {[{username(), [Interactive, DbUpdate, ViewUpdate]}]}}
+%% ]
+%% when Interactive = DbUpdate = ViewUpdate = non_neg_integer().
+-type depths() :: compaction | replication | low.
+-type user_depth() :: {binary(), [non_neg_integer()]}.
+-type depth_ele() :: {depths(), non_neg_integer()} | user_depth().
+-type queue_depths() :: [depth_ele()].
+-type read_write() :: reads | writes | unknown.
+
+
+-define(SERVER_ID(SID), list_to_atom("ioq_server_" ++ integer_to_list(SID))).
+
+
+-spec call(pid(), term(), io_dimensions()) -> term().
+call(Fd, Msg, Dimensions) ->
+ Req0 = #ioq_request{
+ fd = Fd,
+ msg = Msg,
+ t0 = os:timestamp()
+ },
+ Req = add_request_dimensions(Req0, Dimensions),
+ Class = atom_to_list(Req#ioq_request.class),
+ case config:get_boolean("ioq2.bypass", Class, false) of
+ true ->
+ RW = rw(Msg),
+ couch_stats:increment_counter([couchdb, io_queue2, bypassed_count]),
+ couch_stats:increment_counter(
+ [couchdb, io_queue2, RW, bypassed_count]),
+ gen_server:call(Fd, Msg, infinity);
+ _ ->
+ DispatchStrategy = config:get(
+ "ioq2", "dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER),
+ Server = case DispatchStrategy of
+ ?DISPATCH_RANDOM ->
+ maybe_seed(),
+ SID = random:uniform(erlang:system_info(schedulers)),
+ ?SERVER_ID(SID);
+ ?DISPATCH_FD_HASH ->
+ NumSchedulers = erlang:system_info(schedulers),
+ SID = 1 + (erlang:phash2(Fd) rem NumSchedulers),
+ ?SERVER_ID(SID);
+ ?DISPATCH_SINGLE_SERVER ->
+ ?SERVER_ID(1);
+ _ ->
+ SID = erlang:system_info(scheduler_id),
+ ?SERVER_ID(SID)
+ end,
+ gen_server:call(Server, Req, infinity)
+ end.
+
+
+-spec pcall(any()) -> any().
+pcall(Msg) ->
+ pcall(Msg, 500).
+
+
+-spec pcall(any(), non_neg_integer()) -> any().
+pcall(Msg, Timeout) ->
+ {MainPid, MainRef} = spawn_monitor(fun() ->
+ PidRefs = lists:map(fun(Name) ->
+ spawn_monitor(fun() ->
+ Resp = gen_server:call(Name, Msg, Timeout),
+ exit({resp_ok, Resp})
+ end)
+ end, ioq_sup:get_ioq2_servers()),
+ Resps = lists:map(fun({Pid, _}) ->
+ receive
+ {'DOWN', _, _, Pid, {resp_ok, Resp}} ->
+ Resp;
+ {'DOWN', _, _, Pid, Error} ->
+ exit(Error)
+ end
+ end, PidRefs),
+ exit({resp_ok, Resps})
+ end),
+ receive
+ {'DOWN', _, _, MainPid, {resp_ok, Resps}} ->
+ {ok, Resps};
+ {'DOWN', _, _, MainPid, Error} ->
+ {error, Error}
+ after Timeout ->
+ erlang:demonitor(MainRef, [flush]),
+ exit(MainPid, kill),
+ {error, timeout}
+ end.
+
+
+-spec get_queue_depths() -> queue_depths().
+get_queue_depths() ->
+ case pcall(get_pending_reqs, 500) of
+ {ok, PReqs} ->
+ get_queue_depths([Req || {_Priority, Req} <- lists:flatten(PReqs)]);
+ {error, _} ->
+ [
+ {compaction, ?BAD_MAGIC_NUM},
+ {replication, ?BAD_MAGIC_NUM},
+ {low, ?BAD_MAGIC_NUM},
+ {channels, {[]}}
+ ]
+ end.
+
+
+-spec get_queue_depths([ioq_request()]) -> queue_depths().
+get_queue_depths(Reqs) ->
+ {ok, Users0} = khash:new(),
+ {Compaction, Replication, Low, Users} = lists:foldl(
+ fun
+ (#ioq_request{class=db_compact}, {C, R, L, U}) ->
+ {C+1, R, L, U};
+ (#ioq_request{class=view_compact}, {C, R, L, U}) ->
+ {C+1, R, L, U};
+ (#ioq_request{class=internal_repl}, {C, R, L, U}) ->
+ {C, R+1, L, U};
+ (#ioq_request{class=low}, {C, R, L, U}) ->
+ {C, R, L+1, U};
+ (#ioq_request{class=Class, user=User}, {C, R, L, U}) ->
+ [UI0, UDB0, UV0] = case khash:get(U, User) of
+ undefined ->
+ [0,0,0];
+ UC0 ->
+ UC0
+ end,
+ UC = case Class of
+ db_update ->
+ [UI0, UDB0+1, UV0];
+ view_update ->
+ [UI0, UDB0, UV0+1];
+ _Interactive ->
+ [UI0+1, UDB0, UV0]
+ end,
+ ok = khash:put(U, User, UC),
+ {C, R, L, U}
+ end,
+ {0, 0, 0, Users0},
+ Reqs
+ ),
+ [
+ {compaction, Compaction},
+ {replication, Replication},
+ {low, Low},
+ {channels, {khash:to_list(Users)}}
+ ].
+
+
+-spec get_concurrency() -> non_neg_integer().
+get_concurrency() ->
+ case pcall(get_concurrency, 500) of
+ {ok, Concurrencies} ->
+ lists:sum(Concurrencies);
+ {error, _} ->
+ ?BAD_MAGIC_NUM
+ end.
+
+
+-spec set_concurrency(non_neg_integer()) -> non_neg_integer().
+set_concurrency(C) when is_integer(C), C > 0 ->
+ lists:foldl(
+ fun(Pid, Total) ->
+ {ok, Old} = gen_server:call(Pid, {set_concurrency, C}, 1000),
+ Total + Old
+ end,
+ 0,
+ ioq_sup:get_ioq2_servers()
+ );
+set_concurrency(_) ->
+ erlang:error(badarg).
+
+
+get_counters() ->
+ undefined.
+
+
+%% @equiv get_state(?SERVER_ID(1))
+-spec get_state() -> any().
+get_state() ->
+ get_state(?SERVER_ID(1)).
+
+
+%% Returns a mutated #state{} with list representations of khash/hqueue objects
+-spec get_state(atom()) -> any().
+get_state(Server) ->
+ gen_server:call(Server, get_state, infinity).
+
+
+-spec update_config() -> ok.
+update_config() ->
+ gen_server:call(?SERVER_ID(1), update_config, infinity).
+
+
+start_link(Name, SID, Bind) ->
+ Options = case Bind of
+ true -> [{scheduler, SID}];
+ false -> []
+ end,
+ gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options).
+
+
+init([Name, SID]) ->
+ {ok, HQ} = hqueue:new(),
+ {ok, Reqs} = khash:new(),
+ {ok, Waiters} = khash:new(),
+ State = #state{
+ queue = HQ,
+ reqs = Reqs,
+ waiters = Waiters,
+ server_name = Name,
+ scheduler_id = SID
+ },
+ {ok, update_config_int(State)}.
+
+
+handle_call(get_state, _From, State) ->
+ Resp = State#state{
+ user_p = khash:to_list(State#state.user_p),
+ class_p = khash:to_list(State#state.class_p),
+ shard_p = khash:to_list(State#state.shard_p),
+ reqs = khash:to_list(State#state.reqs),
+ waiters = khash:to_list(State#state.waiters),
+ queue = hqueue:to_list(State#state.queue)
+ },
+
+ {reply, Resp, State, 0};
+handle_call(#ioq_request{} = Req, From, State) ->
+ {noreply, enqueue_request(Req#ioq_request{from=From}, State), 0};
+handle_call({hqueue, Method, Args}, _From, #state{queue=HQ}=State) ->
+ Resp = erlang:apply(hqueue, Method, [HQ | Args]),
+ {reply, Resp, State, 0};
+handle_call(update_config, _From, State) ->
+ {reply, ok, update_config_int(State), 0};
+handle_call(get_concurrency, _From, State) ->
+ {reply, State#state.concurrency, State, 0};
+handle_call({set_concurrency, C}, _From, State) when is_integer(C), C > 0 ->
+ {reply, {ok, State#state.concurrency}, State#state{concurrency = C}, 0};
+handle_call(get_reqs, _From, #state{reqs=Reqs}=State) ->
+ {reply, khash:to_list(Reqs), State, 0};
+handle_call(get_pending_reqs, _From, #state{queue=HQ}=State) ->
+ {reply, hqueue:to_list(HQ), State, 0};
+handle_call(get_counters, _From, State) ->
+ {reply, undefined, State, 0};
+handle_call(_, _From, State) ->
+ {reply, ok, State, 0}.
+
+
+handle_cast(update_config, State) ->
+ {noreply, update_config_int(State), 0};
+handle_cast(_Msg, State) ->
+ {noreply, State, 0}.
+
+
+handle_info({Ref, Reply}, #state{reqs = Reqs} = State) ->
+ case khash:get(Reqs, Ref) of
+ undefined ->
+ ok;
+ #ioq_request{ref=Ref}=Req ->
+ ok = khash:del(Reqs, Ref),
+ TResponse = os:timestamp(),
+ ServiceTime = time_delta(TResponse, Req#ioq_request.tsub),
+ IOWait = time_delta(TResponse, Req#ioq_request.t0),
+ couch_stats:update_histogram(
+ [couchdb, io_queue2, svctm], ServiceTime),
+ couch_stats:update_histogram([couchdb, io_queue2, iowait], IOWait),
+ erlang:demonitor(Ref, [flush]),
+ send_response(State#state.waiters, Req, Reply)
+ end,
+ {noreply, State, 0};
+handle_info({'DOWN', Ref, _, _, Reason}, #state{reqs = Reqs} = State) ->
+ case khash:get(Reqs, Ref) of
+ undefined ->
+ ok;
+ #ioq_request{ref=Ref}=Req ->
+ couch_stats:increment_counter([couchdb, io_queue2, io_errors]),
+ ok = khash:del(Reqs, Ref),
+ send_response(State#state.waiters, Req, {'EXIT', Reason})
+ end,
+ {noreply, State, 0};
+handle_info(timeout, State) ->
+ {noreply, maybe_submit_request(State)};
+handle_info(_Info, State) ->
+ {noreply, State, 0}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+code_change(_OldVsn, #state{}=State, _Extra) ->
+ {ok, State}.
+
+
+-spec update_config_int(state()) -> state().
+update_config_int(State) ->
+ Concurrency = config:get_integer("ioq2", "concurrency", ?DEFAULT_CONCURRENCY),
+ ResizeLimit = config:get_integer("ioq2", "resize_limit", ?DEFAULT_RESIZE_LIMIT),
+ DeDupe = config:get_boolean("ioq2", "dedupe", true),
+
+ ScaleFactor = ioq_config:to_float(
+ config:get("ioq2", "scale_factor"),
+ ?DEFAULT_SCALE_FACTOR
+ ),
+
+ MaxPriority = ioq_config:to_float(
+ config:get("ioq2", "max_priority"),
+ ?DEFAULT_MAX_PRIORITY
+ ),
+
+ {ok, ClassP} = ioq_config:build_class_priorities(),
+ {ok, UserP} = ioq_config:build_user_priorities(),
+ {ok, ShardP} = ioq_config:build_shard_priorities(),
+
+ State#state{
+ user_p = UserP,
+ class_p = ClassP,
+ shard_p = ShardP,
+ scale_factor = ScaleFactor,
+ concurrency = Concurrency,
+ dedupe = DeDupe,
+ resize_limit = ResizeLimit,
+ max_priority = MaxPriority
+ }.
+
+
+-spec maybe_submit_request(state()) -> state().
+maybe_submit_request(#state{reqs=Reqs, concurrency=C}=State) ->
+ NumReqs = khash:size(Reqs),
+ case NumReqs < C of
+ true ->
+ case make_next_request(State) of
+ State ->
+ State;
+ NewState when NumReqs >= C-1 ->
+ NewState;
+ NewState ->
+ maybe_submit_request(NewState)
+ end;
+ false ->
+ State
+ end.
+
+
+-spec make_next_request(state()) -> state().
+make_next_request(#state{queue=HQ}=State) ->
+ case hqueue:extract_max(HQ) of
+ {error, empty} ->
+ State;
+ {Priority, #ioq_request{} = Req} ->
+ submit_request(Req#ioq_request{fin_priority=Priority}, State)
+ end.
+
+
+-spec submit_request(ioq_request(), state()) -> state().
+submit_request(Req, #state{iterations=I, resize_limit=RL}=State) when I >= RL ->
+ ok = hqueue:scale_by(State#state.queue, State#state.scale_factor),
+ submit_request(Req, State#state{iterations=0});
+submit_request(Req, #state{iterations=Iterations}=State) ->
+ #ioq_request{
+ fd = Fd,
+ msg = Call,
+ class = Class,
+ t0 = T0
+ } = Req,
+ #state{reqs = Reqs} = State,
+
+ % make the request
+ Ref = erlang:monitor(process, Fd),
+ Fd ! {'$gen_call', {self(), Ref}, Call},
+
+ % record some stats
+ RW = rw(Call),
+
+ SubmitTime = os:timestamp(),
+ Latency = time_delta(SubmitTime, T0),
+ couch_stats:increment_counter([couchdb, io_queue2, Class, count]),
+ couch_stats:increment_counter([couchdb, io_queue2, RW, count]),
+ couch_stats:update_histogram([couchdb, io_queue2, submit_delay], Latency),
+ khash:put(Reqs, Ref, Req#ioq_request{tsub=SubmitTime, ref=Ref}),
+ State#state{iterations=Iterations+1}.
+
+
+-spec send_response(khash:khash(), ioq_request(), term()) -> [ok].
+send_response(Waiters, #ioq_request{key=Key}, Reply) ->
+ Waiting = khash:get(Waiters, Key),
+ khash:del(Waiters, Key),
+ [gen_server:reply(W, Reply) || W <- Waiting].
+
+
+-spec waiter_key(ioq_request(), state()) -> {waiter_key(), state()}.
+waiter_key(Req, State) ->
+ case {State#state.dedupe, Req#ioq_request.msg} of
+ {true, {pread_iolist, Pos}} ->
+ {{Req#ioq_request.fd, Pos}, State};
+ _ ->
+ Next = State#state.next_key,
+ {Next, State#state{next_key = Next + 1}}
+ end.
+
+
+-spec enqueue_request(ioq_request(), state()) -> state().
+enqueue_request(Req, #state{queue=HQ, waiters=Waiters}=State0) ->
+ #ioq_request{
+ from = From,
+ msg = Msg
+ } = Req,
+ {ReqKey, State} = waiter_key(Req, State0),
+ RW = rw(Msg),
+
+ couch_stats:increment_counter([couchdb, io_queue2, queued]),
+ couch_stats:increment_counter([couchdb, io_queue2, RW, queued]),
+
+ case khash:get(State#state.waiters, ReqKey, not_found) of
+ not_found ->
+ Priority = prioritize_request(Req, State),
+ Req1 = Req#ioq_request{
+ key = ReqKey,
+ init_priority = Priority
+ },
+ hqueue:insert(HQ, Priority, Req1),
+ khash:put(State#state.waiters, ReqKey, [From]);
+ Pids ->
+ couch_stats:increment_counter([couchdb, io_queue2, merged]),
+ khash:put(Waiters, ReqKey, [From | Pids])
+ end,
+ State.
+
+
+-spec add_request_dimensions(ioq_request(), io_dimensions()) -> ioq_request().
+add_request_dimensions(Request, {Class, Shard}) ->
+ add_request_dimensions(Request, {Class, Shard, undefined});
+add_request_dimensions(Request, {Class, Shard0, GroupId}) ->
+ {Shard, User, DbName} = case {Class, Shard0} of
+ {interactive, "dbcopy"} ->
+ {undefined, undefined, undefined};
+ {db_meta, security} ->
+ {undefined, undefined, undefined};
+ _ ->
+ Shard1 = filename:rootname(Shard0),
+ {User0, DbName0} = shard_info(Shard1),
+ {Shard1, User0, DbName0}
+ end,
+ Request#ioq_request{
+ shard = Shard,
+ user = User,
+ db = DbName,
+ ddoc = GroupId,
+ class = Class
+ };
+add_request_dimensions(Request, undefined) ->
+ Request#ioq_request{class=other}.
+
+
+-spec shard_info(dbname()) -> {any(), any()}.
+shard_info(Shard) ->
+ try split(Shard) of
+ [<<"shards">>, _, <<"heroku">>, AppId, DbName] ->
+ {<<AppId/binary, ".heroku">>, DbName};
+ [<<"shards">>, _, DbName] ->
+ {system, DbName};
+ [<<"shards">>, _, Account, DbName] ->
+ {Account, DbName};
+ [<<"shards">>, _, Account | DbParts] ->
+ {Account, filename:join(DbParts)};
+ _ ->
+ {undefined, undefined}
+ catch _:_ ->
+ {undefined, undefined}
+ end.
+
+
+-spec split(binary()) -> [binary()]
+ ; ([binary()]) -> [binary()].
+split(B) when is_binary(B) ->
+ split(B, 0, 0, []);
+split(B) ->
+ B.
+
+-spec split(binary(), non_neg_integer(), non_neg_integer(), [binary()]) -> [binary()].
+split(B, O, S, Acc) ->
+ case B of
+ <<_:O/binary>> ->
+ Len = O - S,
+ <<_:S/binary, Part:Len/binary>> = B,
+ lists:reverse(Acc, [Part]);
+ <<_:O/binary, $/, _/binary>> ->
+ Len = O - S,
+ <<_:S/binary, Part:Len/binary, _/binary>> = B,
+ split(B, O+1, O+1, [Part | Acc]);
+ _ ->
+ split(B, O+1, S, Acc)
+ end.
+
+-spec time_delta(T1, T0) -> Tdiff when
+ T1 :: erlang:timestamp(),
+ T0 :: erlang:timestamp(),
+ Tdiff :: integer().
+time_delta(T1, T0) ->
+ trunc(timer:now_diff(T1, T0) / 1000).
+
+
+-spec rw(io_dimensions()) -> read_write().
+rw({pread_iolist, _}) ->
+ reads;
+rw({append_bin, _}) ->
+ writes;
+rw({append_bin, _, _}) ->
+ writes;
+rw(_) ->
+ unknown.
+
+
+-spec prioritize_request(ioq_request(), state()) -> priority().
+prioritize_request(Req, State) ->
+ #state{
+ class_p = ClassP,
+ user_p = UserP,
+ shard_p = ShardP,
+ max_priority = Max
+ } = State,
+ case ioq_config:prioritize(Req, ClassP, UserP, ShardP) of
+ Priority when Priority < 0.0 -> 0.0;
+ Priority when Priority > Max -> Max;
+ Priority -> Priority
+ end.
+
+
+-spec maybe_seed() -> {integer(), integer(), integer()}.
+maybe_seed() ->
+ case get(random_seed) of
+ undefined ->
+ <<A:32, B:32, C:32>> = crypto:strong_rand_bytes(12),
+ Seed = {A, B, C},
+ random:seed(Seed),
+ Seed;
+ Seed ->
+ Seed
+ end.
+
+
+%% ioq_server2 Tests
+
+
+-ifdef(TEST).
+
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+mock_server() ->
+ mock_server([]).
+
+
+mock_server(Config) ->
+ meck:new(config),
+ meck:expect(config, get, fun(Group) ->
+ couch_util:get_value(Group, Config, [])
+ end),
+ meck:expect(config, get, fun(_,_) ->
+ undefined
+ end),
+ meck:expect(config, get, fun("ioq2", _, Default) ->
+ Default
+ end),
+ meck:expect(config, get_integer, fun("ioq2", _, Default) ->
+ Default
+ end),
+ meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
+ Default
+ end),
+ {ok, State} = ioq_server2:init([?SERVER_ID(1), 1]),
+ State.
+
+
+unmock_server(_) ->
+ true = meck:validate(config),
+ ok = meck:unload(config).
+
+
+empty_config_test_() ->
+ {
+ "Empty config tests",
+ {
+ foreach,
+ fun mock_server/0,
+ fun unmock_server/1,
+ [
+ fun test_basic_server_config/1,
+ fun test_simple_request_priority/1,
+ fun test_simple_dedupe/1,
+ fun test_io_error/1
+ ]
+ }
+ }.
+
+
+simple_config_test_() ->
+ {
+ "Simple config tests",
+ {
+ foreach,
+ fun() ->
+ Config = [
+ {"ioq2.classes", [{"db_compact", "0.9"}]},
+ {"ioq2", [{"resize_limit", "10"}]}
+ ],
+ mock_server(Config)
+ end,
+ fun unmock_server/1,
+ [
+ fun test_simple_config/1,
+ fun test_auto_scale/1
+ ]
+ }
+ }.
+
+
+priority_extremes_test_() ->
+ {
+ "Test min/max priorities",
+ {
+ foreach,
+ fun() ->
+ Config = [
+ {"ioq2.classes", [
+ {"db_compact", "9999999.0"},
+ {"interactive", "-0.00000000000000001"}
+ ]}
+ ],
+ mock_server(Config)
+ end,
+ fun unmock_server/1,
+ [
+ fun test_min_max_priorities/1
+ ]
+ }
+ }.
+
+
+queue_depths_test_() ->
+ Foo = <<"foo">>,
+ Bar = <<"bar">>,
+ Reqs = [
+ #ioq_request{user=Foo, class=db_compact},
+ #ioq_request{user=Bar, class=db_compact},
+ #ioq_request{user=Bar, class=view_compact},
+ #ioq_request{user=Foo, class=internal_repl},
+ #ioq_request{user=Bar, class=internal_repl},
+ #ioq_request{user=Bar, class=internal_repl},
+ #ioq_request{class=low},
+
+ #ioq_request{user=Foo, class=interactive},
+ #ioq_request{user=Foo, class=interactive},
+ #ioq_request{user=Foo, class=db_update},
+ #ioq_request{user=Foo, class=view_update},
+ #ioq_request{user=Foo, class=view_update},
+ #ioq_request{user=Foo, class=view_update},
+ #ioq_request{user=Foo, class=view_update},
+
+ #ioq_request{user=Bar, class=interactive},
+ #ioq_request{user=Bar, class=db_update},
+ #ioq_request{user=Bar, class=db_update},
+ #ioq_request{user=Bar, class=db_update},
+ #ioq_request{user=Bar, class=view_update}
+ ],
+ Expected = [
+ {compaction, 3},
+ {replication, 3},
+ {low, 1},
+ {channels, {[
+ {<<"foo">>, [2,1,4]},
+ {<<"bar">>, [1,3,1]}
+ ]}}
+ ],
+
+ {
+ "Test queue depth stats",
+ ?_assertEqual(
+ Expected,
+ get_queue_depths(Reqs)
+ )
+ }.
+
+
+test_basic_server_config(St0) ->
+ {reply, RespState, _St1, 0} = handle_call(get_state, pid, St0),
+ [
+ ?_assertEqual([], RespState#state.user_p),
+ ?_assertEqual(
+ lists:sort(?DEFAULT_CLASS_PRIORITIES),
+ lists:sort(RespState#state.class_p)
+ ),
+ ?_assertEqual([], RespState#state.shard_p)
+ ].
+
+
+test_simple_request_priority(St0) ->
+ From = pid1,
+ Request0 = #ioq_request{class=db_compact},
+ Priority = prioritize_request(Request0, St0),
+ Request1 = Request0#ioq_request{
+ init_priority = Priority,
+ from = From,
+ key = St0#state.next_key
+ },
+ {noreply, St1, 0} = handle_call(Request0, From, St0),
+ {reply, RespState, _St2, 0} = handle_call(get_state, From, St1),
+ [
+ ?_assertEqual(
+ [{Priority, Request1}],
+ RespState#state.queue
+ )
+ ].
+
+
+test_simple_dedupe(St0) ->
+ FromA = pid1,
+ FromB = pid2,
+ Fd = fd,
+ Pos = 1234,
+ Msg = {pread_iolist, Pos},
+ Request0 = #ioq_request{
+ class=db_compact,
+ fd = Fd,
+ msg = Msg
+ },
+ {ReqKey, St1} = waiter_key(Request0, St0),
+ Priority = prioritize_request(Request0, St1),
+ Request1A = Request0#ioq_request{
+ init_priority = Priority,
+ from = FromA,
+ key = {Fd, Pos}
+ },
+ _Request1B = Request0#ioq_request{
+ init_priority = Priority,
+ from = FromA,
+ key = {Fd, Pos}
+ },
+ {noreply, St2, 0} = handle_call(Request0, FromA, St1),
+ {noreply, St3, 0} = handle_call(Request0, FromB, St2),
+ {reply, RespState, _St4, 0} = handle_call(get_state, FromA, St3),
+ [
+ ?_assertEqual(
+ [{Priority, Request1A}],
+ RespState#state.queue
+ ),
+ ?_assertEqual(
+ [{ReqKey, [FromB, FromA]}],
+ RespState#state.waiters
+ )
+ ].
+
+
+test_simple_config(St) ->
+ RequestA = #ioq_request{},
+ RequestB = #ioq_request{class=db_compact},
+ PriorityA = prioritize_request(RequestA, St),
+ PriorityB = prioritize_request(RequestB, St),
+
+ [
+ ?_assertEqual(
+ 1.0,
+ PriorityA
+ ),
+ ?_assertEqual(
+ 0.9,
+ PriorityB
+ )
+ ].
+
+
+test_min_max_priorities(St) ->
+ RequestA = #ioq_request{class=interactive},
+ RequestB = #ioq_request{class=db_compact},
+ PriorityA = prioritize_request(RequestA, St),
+ PriorityB = prioritize_request(RequestB, St),
+
+ [
+ ?_assertEqual(
+ 0.0,
+ PriorityA
+ ),
+ ?_assertEqual(
+ ?DEFAULT_MAX_PRIORITY,
+ PriorityB
+ )
+ ].
+
+
+test_auto_scale(#state{queue=HQ}=St0) ->
+ %% start with iterations=2 so we can tell when we auto-scaled
+ St1 = St0#state{resize_limit=10, iterations=2},
+ Pid = spawn(fun() -> receive baz -> ok end end),
+ T0 = os:timestamp(),
+ BaseReq = #ioq_request{t0=T0, fd=Pid},
+
+ RequestA = BaseReq#ioq_request{ref=make_ref()},
+ RequestB = BaseReq#ioq_request{class=db_compact, ref=make_ref()},
+ PriorityA = prioritize_request(RequestA, St1),
+ PriorityB = prioritize_request(RequestB, St1),
+
+ {noreply, St2, 0} = handle_call(RequestB, Pid, St1),
+ {noreply, St3, 0} = handle_call(RequestA, Pid, St2),
+
+ {_, #ioq_request{init_priority=PriorityA2}} = hqueue:extract_max(HQ),
+ Tests0 = [?_assertEqual(PriorityA, PriorityA2)],
+ {_St, Tests} = lists:foldl(
+ fun(_N, {#state{iterations=I, resize_limit=RL}=StN0, TestsN}) ->
+ ReqN = BaseReq#ioq_request{ref=make_ref()},
+ ExpectedPriority = case I == 1 of
+ false -> PriorityA;
+ true -> PriorityB
+ end,
+ {noreply, StN1, 0} = handle_call(ReqN, Pid, StN0),
+ StN2 = submit_request(ReqN, StN1),
+ {_, #ioq_request{init_priority=PriorityN}} = hqueue:extract_max(HQ),
+ {
+ StN2,
+ [?_assertEqual(ExpectedPriority, PriorityN) | TestsN]
+ }
+ end,
+ {St3, Tests0},
+ lists:seq(1, St3#state.resize_limit + 7)
+ ),
+ lists:reverse(Tests).
+
+
+all_test_() ->
+ {setup, fun setup/0, fun cleanup/1, fun instantiate/1}.
+
+
+many_clients_test_() ->
+ FDCount = 50,
+ ClientCount = 10,
+ MaxDelay = 20,
+ {
+ setup,
+ fun() -> setup_many(FDCount, MaxDelay) end,
+ fun cleanup/1,
+ fun(Servers) -> test_many_clients(Servers, ClientCount) end
+ }.
+
+
+setup() ->
+ meck:new(config, [passthrough]),
+ meck:expect(config, get_boolean,
+ fun
+ ("ioq2", "enabled", _) ->
+ true;
+ ("ioq2", "server_per_scheduler", _) ->
+ false;
+ (_, _, Default) ->
+ Default
+ end
+ ),
+ {ok, _} = application:ensure_all_started(ioq),
+ FakeServer = fun(F) ->
+ receive {'$gen_call', {Pid, Ref}, Call} ->
+ Pid ! {Ref, {reply, Call}}
+ end,
+ F(F)
+ end,
+ spawn(fun() -> FakeServer(FakeServer) end).
+
+
+setup_many(Count, RespDelay) ->
+ {ok, _} = application:ensure_all_started(ioq),
+ meck:new(config, [passthrough]),
+ meck:expect(config, get_boolean,
+ fun
+ ("ioq2", "enabled", _) ->
+ true;
+ ("ioq2", "server_per_scheduler", _) ->
+ false;
+ (_, _, Default) ->
+ Default
+ end
+ ),
+ FakeServer = fun(F) ->
+ receive {'$gen_call', {Pid, Ref}, Call} ->
+ timer:sleep(random:uniform(RespDelay)),
+ Pid ! {Ref, {reply, Call}}
+ end,
+ F(F)
+ end,
+ [spawn(fun() -> FakeServer(FakeServer) end) || _ <- lists:seq(1, Count)].
+
+
+cleanup(Server) when not is_list(Server) ->
+ cleanup([Server]);
+cleanup(Servers) ->
+ ok = application:stop(ioq),
+ true = meck:validate(config),
+ ok = meck:unload(config),
+ [exit(Server, kill) || Server <- Servers].
+
+
+instantiate(S) ->
+ Old = ?DEFAULT_CONCURRENCY * length(ioq_sup:get_ioq2_servers()),
+ [{inparallel, lists:map(fun(IOClass) ->
+ lists:map(fun(Shard) ->
+ check_call(S, make_ref(), priority(IOClass, Shard))
+ end, shards())
+ end, io_classes())},
+ ?_assertEqual(Old, ioq:set_disk_concurrency(10)),
+ ?_assertError(badarg, ioq:set_disk_concurrency(0)),
+ ?_assertError(badarg, ioq:set_disk_concurrency(-1)),
+ ?_assertError(badarg, ioq:set_disk_concurrency(foo))].
+
+
+check_call(Server, Call, Priority) ->
+ ?_assertEqual({reply, Call}, ioq_server2:call(Server, Call, Priority)).
+
+
+io_classes() -> [interactive, view_update, db_compact, view_compact,
+ internal_repl, other, db_meta].
+
+
+shards() ->
+ [
+ <<"shards/0-1/heroku/app928427/couchrest.1317609656.couch">>,
+ <<"shards/0-1/foo">>,
+ <<"shards/0-3/foo">>,
+ <<"shards/0-1/bar">>,
+ <<"shards/0-1/kocolosk/stats.1299297461.couch">>,
+ <<"shards/0-1/kocolosk/my/db.1299297457.couch">>,
+ other
+ ].
+
+
+priority(view_update, Shard) ->
+ {view_update, Shard, <<"_design/foo">>};
+priority(Any, Shard) ->
+ {Any, Shard}.
+
+
+test_many_clients(Servers, ClientCount) ->
+ ClientFun = fun() ->
+ ok = lists:foreach(fun(IOClass) ->
+ ok = lists:foreach(fun(Shard) ->
+ Server = random_server(Servers),
+ Ref = make_ref(),
+ Priority = priority(IOClass, Shard),
+ {reply, Ref} = ioq_server2:call(Server, Ref, Priority),
+ ok
+ end, shards())
+ end, io_classes()),
+ ok
+ end,
+ ok = lists:foreach(fun(_) -> spawn_monitor(ClientFun) end, lists:seq(1, ClientCount)),
+
+ Status = wait_for_success(ClientCount),
+ ?_assert(Status).
+
+
+wait_for_success(0) ->
+ true;
+wait_for_success(Count) when Count > 0 ->
+ receive
+ {'DOWN', _Ref, process, _Pid, normal} ->
+ wait_for_success(Count - 1);
+ Msg ->
+ ?debugFmt("UNEXPECTED CLIENT EXIT: ~p~n", [Msg]),
+ false
+ end.
+
+
+random_server(Servers) ->
+ lists:nth(random:uniform(length(Servers)), Servers).
+
+
+test_io_error(#state{waiters=Waiters, reqs=Reqs}=State) ->
+ Key = asdf,
+ Ref = make_ref(),
+ RefTag = make_ref(),
+ Req = #ioq_request{ref=Ref, key=Key},
+ khash:put(Waiters, Key, [{self(), RefTag}]),
+ khash:put(Reqs, Ref, Req),
+ Error = {exit, foo},
+ {noreply, _State1, 0} = handle_info({'DOWN', Ref, baz, zab, Error}, State),
+ Resp = receive
+ {RefTag, {'EXIT', Error}} ->
+ {ok, Error};
+ Else ->
+ {error, Else}
+ after 5000 ->
+ {error, timeout}
+ end,
+ ?_assertEqual({ok, Error}, Resp).
+
+
+-endif.
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
index c4d04a9..7ea6284 100644
--- a/src/ioq_sup.erl
+++ b/src/ioq_sup.erl
@@ -13,6 +13,7 @@
-module(ioq_sup).
-behaviour(supervisor).
-export([start_link/0, init/1]).
+-export([get_ioq2_servers/0]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
@@ -21,4 +22,28 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
- {ok, { {one_for_one, 5, 10}, [?CHILD(ioq, worker)]}}.
+ ok = ioq_config_listener:subscribe(),
+ IOQ2Children = ioq_server2_children(),
+ {ok, {
+ {one_for_one, 5, 10},
+ [
+ ?CHILD(ioq_server, worker),
+ ?CHILD(ioq_osq, worker)
+ | IOQ2Children
+ ]
+ }}.
+
+ioq_server2_children() ->
+ Bind = config:get_boolean("ioq2", "bind_to_schedulers", false),
+ ioq_server2_children(erlang:system_info(schedulers), Bind).
+
+ioq_server2_children(Count, Bind) ->
+ lists:map(fun(I) ->
+ Name = list_to_atom("ioq_server_" ++ integer_to_list(I)),
+ {Name, {ioq_server2, start_link, [Name, I, Bind]}, permanent, 5000, worker, [Name]}
+ end, lists:seq(1, Count)).
+
+get_ioq2_servers() ->
+ lists:map(fun(I) ->
+ list_to_atom("ioq_server_" ++ integer_to_list(I))
+ end, lists:seq(1, erlang:system_info(schedulers))).
diff --git a/test/ioq_config_tests.erl b/test/ioq_config_tests.erl
new file mode 100644
index 0000000..d3fee81
--- /dev/null
+++ b/test/ioq_config_tests.erl
@@ -0,0 +1,157 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_config_tests).
+
+
+-include_lib("ioq/include/ioq.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+
+-define(USERS_CONFIG, [{"bar","2.4"},{"baz","4.0"},{"foo","1.2"}]).
+-define(CLASSES_CONFIG, [{"view_compact","2.4"},{"db_update","1.9"}]).
+-define(SHARDS_CONFIG, [
+ {
+ {<<"shards/00000000-1fffffff/foo/pizza_db">>, db_update},
+ 5.0
+ },
+ {
+ {<<"shards/00000000-1fffffff/foo/pizza_db">>, view_update},
+ 7.0
+ }
+]).
+
+
+priorities_test_() ->
+ {ok, ShardP} = ioq_config:build_shard_priorities(?SHARDS_CONFIG),
+ {ok, UserP} = ioq_config:build_user_priorities(?USERS_CONFIG),
+ {ok, ClassP} = ioq_config:build_class_priorities(?CLASSES_CONFIG),
+ Tests = [
+ %% {User, Shard, Class, UP * SP * CP}
+ {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, db_update, 1.2 * 5.0 * 1.9},
+ {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, view_update, 1.2 * 7.0 * 1.0},
+ {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, view_compact, 1.2 * 1.0 * 2.4},
+ {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, db_compact, 1.2 * 1.0 * 0.0001},
+ {<<"foo">>, <<"shards/00000000-1fffffff/foo/pizza_db">>, internal_repl, 1.2 * 1.0 * 0.001},
+ {<<"baz">>, undefined, internal_repl, 4 * 1.0 * 0.001}
+ ],
+ lists:map(
+ fun({User, Shard, Class, Priority}) ->
+ Req = #ioq_request{user=User, shard=Shard, class=Class},
+ ?_assertEqual(
+ Priority,
+ ioq_config:prioritize(Req, ClassP, UserP, ShardP)
+ )
+ end,
+ Tests
+ ).
+
+
+parse_shard_string_test_() ->
+ Shard = "shards/00000000-1fffffff/foo/pizza_db",
+ Classes = ["db_update", "view_update", "view_compact", "db_compact"],
+ lists:map(
+ fun(Class) ->
+ ShardString = Shard ++ "||" ++ Class,
+ ?_assertEqual(
+ {list_to_binary(Shard), list_to_existing_atom(Class)},
+ ioq_config:parse_shard_string(ShardString)
+ )
+ end,
+ Classes
+ ).
+
+
+parse_bad_string_test_() ->
+ Shard = "shards/00000000-1fffffff/foo/pizza_db$$$$$ASDF",
+ ?_assertEqual(
+ {error, Shard},
+ ioq_config:parse_shard_string(Shard)
+ ).
+
+
+to_float_test_() ->
+ Default = 123456789.0,
+ Tests = [
+ {0.0, 0},
+ {0.0, "0"},
+ {1.0, "1"},
+ {1.0, 1},
+ {7.9, 7.9},
+ {7.9, "7.9"},
+ {79.0, "79"},
+ {Default, "asdf"}
+ ],
+ [?_assertEqual(E, ioq_config:to_float(T, Default)) || {E, T} <- Tests].
+
+
+config_set_test_() ->
+ {
+ "Test ioq_config setters",
+ {
+ foreach,
+ fun() -> test_util:start_applications([config, couch_log]) end,
+ fun(_) -> test_util:stop_applications([config, couch_log]) end,
+ [
+ fun check_simple_configs/1,
+ fun check_bypass_configs/1
+ ]
+ }
+ }.
+
+
+check_simple_configs(_) ->
+ Defaults = [
+ {"concurrency", "1"},
+ {"resize_limit", "1000"},
+ {"dedupe", "true"},
+ {"scale_factor", "2.0"},
+ {"max_priority", "10000.0"},
+ {"enabled", "false"},
+ {"dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER}
+ ],
+ SetTests = [
+ {set_concurrency, 9, "9"},
+ {set_resize_limit, 8888, "8888"},
+ {set_dedupe, false, "false"},
+ {set_scale_factor, 3.14, "3.14"},
+ {set_max_priority, 99999.99, "99999.99"},
+ {set_enabled, true, "true"},
+ {set_dispatch_strategy, ?DISPATCH_FD_HASH, ?DISPATCH_FD_HASH}
+ ],
+
+ Reason = "ioq_config_tests",
+ %% Custom assert for handling floats as strings
+ Assert = fun(Expected0, Value0) ->
+ ?_assertEqual(
+ ioq_config:to_float(Expected0, Expected0),
+ ioq_config:to_float(Value0, Value0)
+ )
+ end,
+
+ Tests0 = lists:map(fun({Key, Default}) ->
+ Value = config:get("ioq2", Key, Default),
+ ?_assertEqual(Default, Value)
+ end, Defaults),
+
+ lists:foldl(fun({Fun, Value, Result}, Acc) ->
+ ok = ioq_config:Fun(Value, Reason),
+ Key = lists:sublist(atom_to_list(Fun), 5, 9999),
+ Value1 = config:get("ioq2", Key, undefined),
+ [Assert(Result, Value1) | Acc]
+ end, Tests0, SetTests).
+
+
+check_bypass_configs(_) ->
+ ok = ioq_config:set_bypass(interactive, true, "Bypassing interactive"),
+ Value = config:get_boolean("ioq2.bypass", "interactive", false),
+ ?_assertEqual(true, Value).
diff --git a/test/ioq_kv_tests.erl b/test/ioq_kv_tests.erl
new file mode 100644
index 0000000..3c34573
--- /dev/null
+++ b/test/ioq_kv_tests.erl
@@ -0,0 +1,149 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_kv_tests).
+-behaviour(proper_statem).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+
+-export([
+ initial_state/0,
+ command/1,
+ precondition/2,
+ postcondition/3,
+ next_state/3,
+ random_key/1
+]).
+
+-record(st, {kvs}).
+
+proper_test_() ->
+ PropErOpts = [
+ {to_file, user},
+ {max_size, 5},
+ {numtests, 1000}
+ ],
+ {timeout, 3600, ?_assertEqual([], proper:module(?MODULE, PropErOpts))}.
+
+
+prop_ioq_kvs_almost_any() ->
+ ?FORALL({K, V}, kvs(), begin
+ case (catch ioq_kv:put(K, V)) of
+ ok -> ioq_kv:get(K) == V;
+ {'EXIT', {invalid_term, _}} -> true;
+ _ -> false
+ end
+ end).
+
+
+prop_ioq_kvs() ->
+ ?FORALL(Cmds, commands(?MODULE),
+ begin
+ cleanup(),
+ {H, S, R} = run_commands(?MODULE, Cmds),
+ ?WHENFAIL(
+ io:format("History: ~p\nState: ~p\nRes: ~p\n", [H,S,R]),
+ R =:= ok
+ )
+ end
+ ).
+
+initial_state() ->
+ #st{kvs=dict:new()}.
+
+
+command(S) ->
+ Key = {call, ioq_kv_tests, random_key, [S]},
+ frequency([
+ {1, {call, ioq_kv, init, []}},
+ {9, {call, ioq_kv, get, [Key]}},
+ {1, {call, ioq_kv, get, [key()]}},
+ {9, {call, ioq_kv, put, [Key, val()]}},
+ {1, {call, ioq_kv, put, [key(), val()]}},
+ {2, {call, ioq_kv, delete, [Key]}},
+ {1, {call, ioq_kv, delete, [key()]}}
+ ]).
+
+
+precondition(_, _) ->
+ true.
+
+
+postcondition(_S, {call, _, init, []}, ok) ->
+ true;
+postcondition(S, {call, _, get, [Key]}, Val) ->
+ case dict:is_key(Key, S#st.kvs) of
+ true ->
+ case dict:find(Key, S#st.kvs) of
+ {ok, Val} -> true;
+ _ -> false
+ end;
+ false ->
+ case Val of
+ undefined -> true;
+ _ -> false
+ end
+ end;
+postcondition(_S, {call, _, put, [_Key, _Val]}, ok) ->
+ true;
+postcondition(_S, {call, _, delete, [_Key]}, ok) ->
+ true;
+postcondition(_S, _, _) ->
+ false.
+
+
+next_state(S, _V, {call, _, init, []}) ->
+ S;
+next_state(S, _V, {call, _, get, [_Key]}) ->
+ S;
+next_state(S, _V, {call, _, put, [Key, Val]}) ->
+ S#st{
+ kvs={call, dict, store, [Key, Val, S#st.kvs]}
+ };
+next_state(S, _V, {call, _, delete, [Key]}) ->
+ S#st{
+ kvs={call, dict, erase, [Key, S#st.kvs]}
+ }.
+
+
+random_key(#st{kvs=KVs}) ->
+ Keys0 = dict:fetch_keys(KVs),
+ Keys = lists:append(Keys0, [foo]),
+ NumKeys = erlang:length(Keys),
+ KeyPos = random:uniform(NumKeys),
+ lists:nth(KeyPos, Keys).
+
+cleanup() ->
+ code:purge(ioq_kv_dyn),
+ code:delete(ioq_kv_dyn).
+
+
+% Generators
+
+key() -> almost_any().
+val() -> almost_any().
+kvs() -> {any(), any()}.
+
+% ioq_kv can't handle storing bitstrings that don't have
+% a length divisible by 8. Instead of being clever I
+% just define an almost any.
+almost_any() ->
+ oneof([
+ integer(),
+ float(),
+ atom(),
+ binary(),
+ ?LAZY(loose_tuple(almost_any())),
+ ?LAZY(list(almost_any()))
+ ]).
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
new file mode 100644
index 0000000..b6b7bad
--- /dev/null
+++ b/test/ioq_tests.erl
@@ -0,0 +1,68 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_tests).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all_test_() ->
+ {setup, fun setup/0, fun cleanup/1, fun instantiate/1}.
+
+setup() ->
+ Apps = test_util:start_applications([
+ config, folsom, couch_log, couch_stats, ioq
+ ]),
+ FakeServer = fun(F) ->
+ receive {'$gen_call', {Pid, Ref}, Call} ->
+ Pid ! {Ref, {reply, Call}}
+ end,
+ F(F)
+ end,
+ {Apps, spawn(fun() -> FakeServer(FakeServer) end)}.
+
+cleanup({Apps, Server}) ->
+ test_util:stop_applications(Apps),
+ exit(Server, kill).
+
+instantiate({_, S}) ->
+ [{inparallel, lists:map(fun(IOClass) ->
+ lists:map(fun(Shard) ->
+ check_call(S, make_ref(), priority(IOClass, Shard))
+ end, shards())
+ end, io_classes())},
+ ?_assertEqual(20, ioq:set_disk_concurrency(10)),
+ ?_assertError(badarg, ioq:set_disk_concurrency(0)),
+ ?_assertError(badarg, ioq:set_disk_concurrency(-1)),
+ ?_assertError(badarg, ioq:set_disk_concurrency(foo))].
+
+check_call(Server, Call, Priority) ->
+ ?_assertEqual({reply, Call}, ioq:call(Server, Call, Priority)).
+
+io_classes() -> [interactive, view_update, db_compact, view_compact,
+ internal_repl, other].
+
+shards() ->
+ [
+ <<"shards/0-1/heroku/app928427/couchrest.1317609656.couch">>,
+ <<"shards/0-1/foo">>,
+ <<"shards/0-3/foo">>,
+ <<"shards/0-1/bar">>,
+ <<"shards/0-1/kocolosk/stats.1299297461.couch">>,
+ <<"shards/0-1/kocolosk/my/db.1299297457.couch">>,
+ other
+ ].
+
+priority(view_update, Shard) ->
+ {view_update, Shard, <<"_design/foo">>};
+priority(Any, Shard) ->
+ {Any, Shard}.