You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/06/28 19:44:00 UTC

[couchdb-ioq] branch optimize-ioq-server created (now 396c77c)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch optimize-ioq-server
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git.


      at 396c77c  Use ioq_q in ioq_server

This branch includes the following new commits:

     new 619dc4b  Add a benchmark for ioq_server
     new ea80e0c  Implement an off-heap queue with khash
     new 396c77c  Use ioq_q in ioq_server

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb-ioq] 02/03: Implement an off-heap queue with khash

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch optimize-ioq-server
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit ea80e0c7cd8d85590cd8e636b2dcc8064af2d46a
Author: Paul J. Davis <da...@us.ibm.com>
AuthorDate: Sun Jun 28 14:41:15 2020 -0500

    Implement an off-heap queue with khash
    
    The benchmark results pointed out that once ioq_server's internal state
    gets large enough, the amount of work required for garbage collection
    prevents it from maintaining its performance. Moving the server state
    off-heap via khash alleviates this issue.
---
 src/ioq_q.erl        | 79 ++++++++++++++++++++++++++++++++++++++++++++
 test/ioq_q_tests.erl | 92 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 171 insertions(+)

diff --git a/src/ioq_q.erl b/src/ioq_q.erl
new file mode 100644
index 0000000..8b592b6
--- /dev/null
+++ b/src/ioq_q.erl
@@ -0,0 +1,79 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(ioq_q).
+
+-export([
+    new/0,
+
+    len/1,
+    is_empty/1,
+
+    in/2,
+    out/1,
+
+    to_list/1
+]).
+
+
+new() ->
+    {ok, Q} = khash:new(),
+    ok = khash:put(Q, next_id, 0),
+    ok = khash:put(Q, oldest_id, undefined),
+    Q.
+
+
+len(Q) ->
+    khash:size(Q) - 2. % two metadata keys
+
+
+is_empty(Q) ->
+    len(Q) == 0.
+
+
+in(Item, Q) ->
+    NextId = khash:get(Q, next_id),
+    ok = khash:put(Q, NextId, Item),
+    ok = khash:put(Q, next_id, NextId + 1),
+    case khash:get(Q, oldest_id) of
+        undefined ->
+            khash:put(Q, oldest_id, NextId);
+        _ ->
+            ok
+    end,
+    Q.
+
+
+out(Q) ->
+    NextId = khash:get(Q, next_id),
+    case khash:get(Q, oldest_id) of
+        undefined ->
+            {empty, Q};
+        OldestId ->
+            {value, Item} = khash:lookup(Q, OldestId),
+            khash:del(Q, OldestId),
+            case OldestId + 1 == NextId of
+                true ->
+                    % We've removed the last element in the queue
+                    khash:put(Q, oldest_id, undefined);
+                false ->
+                    khash:put(Q, oldest_id, OldestId + 1)
+            end,
+            {{value, Item}, Q}
+    end.
+
+
+to_list(Q) ->
+    Entries = khash:to_list(Q),
+    NoMeta = [{K, V} || {K, V} <- Entries, is_integer(K)],
+    Sorted = lists:sort(NoMeta),
+    {_, Items} = lists:unzip(Sorted),
+    Items.
diff --git a/test/ioq_q_tests.erl b/test/ioq_q_tests.erl
new file mode 100644
index 0000000..05adc63
--- /dev/null
+++ b/test/ioq_q_tests.erl
@@ -0,0 +1,92 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_q_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(NUM_ITERATIONS, 10000).
+
+-record(st, {
+    ioq_q = ioq_q:new(),
+    erl_q = queue:new()
+}).
+
+
+q_fuzz_test() ->
+    Ops = #{
+        len => 0.2,
+        is_empty => 0.1,
+        in => 0.4,
+        out => 0.2,
+        to_list => 0.1
+    },
+    FinalSt = lists:foldl(fun(_Iter, St) ->
+        Op = map_choice(Ops),
+        #st{
+            ioq_q = IOQQ,
+            erl_q = ErlQ
+        } = St,
+        %?debugFmt("~nOp: ~p~n~p~n~p~n", [Op, ioq_q:to_list(IOQQ), queue:to_list(ErlQ)]),
+        case Op of
+            in ->
+                Item = new_item(),
+                #st{
+                    ioq_q = ioq_q:in(Item, IOQQ),
+                    erl_q = queue:in(Item, ErlQ)
+                };
+            out ->
+                {R1, NewIOQQ} = ioq_q:out(IOQQ),
+                {R2, NewErlQ} = queue:out(ErlQ),
+                ?assertEqual(R2, R1),
+                #st{
+                    ioq_q = NewIOQQ,
+                    erl_q = NewErlQ
+                };
+            _ ->
+                R1 = ioq_q:Op(IOQQ),
+                R2 = queue:Op(ErlQ),
+                ?assertEqual(R2, R1),
+                St
+        end
+    end, #st{}, lists:seq(1, ?NUM_ITERATIONS)),
+    check_drain(FinalSt).
+
+
+check_drain(#st{ioq_q = IOQQ, erl_q = ErlQ}) ->
+    {R1, NewIOQQ} = ioq_q:out(IOQQ),
+    {R2, NewErlQ} = queue:out(ErlQ),
+    ?assertEqual(R2, R1),
+    case R1 == empty of
+        true ->
+            ok;
+        false ->
+            check_drain(#st{ioq_q = NewIOQQ, erl_q = NewErlQ})
+    end.
+
+
+new_item() ->
+    rand:uniform().
+
+
+map_choice(#{} = Choices) ->
+    KVs = maps:to_list(Choices),
+    Sorted = lists:sort([{V, K} || {K, V} <- KVs]),
+    choose(Sorted, rand:uniform()).
+
+
+choose([{Perc, Item}], RandVal) when RandVal =< Perc ->
+    Item;
+choose([{Perc, Item} | _Rest], RandVal) when RandVal < Perc ->
+    Item;
+choose([{Perc, _Item} | Rest], RandVal) ->
+    choose(Rest, RandVal - Perc).


[couchdb-ioq] 01/03: Add a benchmark for ioq_server

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch optimize-ioq-server
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 619dc4bf3079eae8f289f60d251bef05ab270209
Author: Paul J. Davis <da...@us.ibm.com>
AuthorDate: Sun Jun 28 14:40:59 2020 -0500

    Add a benchmark for ioq_server
---
 src/ioq_benchmark.erl | 403 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 403 insertions(+)

diff --git a/src/ioq_benchmark.erl b/src/ioq_benchmark.erl
new file mode 100644
index 0000000..4e2ea2d
--- /dev/null
+++ b/src/ioq_benchmark.erl
@@ -0,0 +1,403 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_benchmark).
+
+
+-export([
+    default_opts/0,
+
+    run/0,
+    run/1,
+
+    run_static/1,
+    run_dynamic/1,
+
+    analyze_priority/1
+]).
+
+-define(PIDS, ioq_benchmark_file_pids).
+-define(WORKERS, ioq_benchmark_workers).
+
+% Copied from ioq_server:
+-record(request, {
+    fd,
+    msg,
+    class,
+    channel, % the name of the channel, not the actual data structure
+    from,
+    ref,
+    t0,
+    tsub
+}).
+
+
+default_opts() ->
+    #{
+        num_files => 10000,
+        active_users => 2,
+        static_load_to_drain => 200000,
+        dynamic_load => #{
+            initial_pids => 25,
+            load_per_pid => 1000, % reqs/sec
+            max_load => 500000, % reqs/sec
+            ramp_delay => 10000, % msecs between pid spawning
+            total_time => 600 % seconds
+        },
+        class_perc => #{
+            customer => 0.75,
+            replication => 0.10,
+            compaction => 0.15,
+            low => 0.0
+        },
+        op_perc => #{
+            read => 0.334,
+            write => 0.333,
+            view => 0.333
+        }
+    }.
+
+
+run() ->
+    run(default_opts()).
+
+
+run(Opts) ->
+    io:format("Starting benchmark on: ~p~n", [self()]),
+    ets:new(?PIDS, [public, set, named_table]),
+    ets:new(?WORKERS, [public, set, named_table]),
+    create_files(Opts),
+    %run_static(Opts),
+    run_dynamic(Opts).
+
+
+run_static(Opts) ->
+    pause_ioq_server(),
+    generate_static_load(Opts),
+    unpause_ioq_server(),
+    T0 = os:timestamp(),
+    wait_for_empty(T0, Opts),
+    USecDiff = timer:now_diff(os:timestamp(), T0),
+    io:format("Static Test: ~.3f~n", [USecDiff / 1000000]).
+
+
+run_dynamic(Opts) ->
+    T0 = os:timestamp(),
+    spawn_link(fun() -> load_spawner(Opts, T0) end),
+    monitor_dynamic(Opts, T0).
+
+
+load_spawner(Opts, T0) ->
+    #{
+        dynamic_load := #{
+            initial_pids := InitialPids,
+            load_per_pid := LoadPerPid,
+            max_load := MaxLoad,
+            ramp_delay := RampDelay,
+            total_time := TotalTime
+        }
+    } = Opts,
+
+    % Fill up our initial pids
+    ToSpawn = max(0, InitialPids - ets:info(?WORKERS, size)),
+    lists:foreach(fun(_) ->
+        Pid = spawn_link(fun() -> worker_loop(Opts, T0) end),
+        ets:insert(?WORKERS, {Pid, true})
+    end, lists:seq(1, ToSpawn)),
+
+    % Spawn a worker if we have room
+    MaxPids = MaxLoad div LoadPerPid,
+    HasMaxWorkers = MaxPids < ets:info(?WORKERS, size),
+    if HasMaxWorkers -> ok; true ->
+        Pid = spawn_link(fun() -> worker_loop(Opts, T0) end),
+        ets:insert(?WORKERS, {Pid, true})
+    end,
+
+    % If we have more time, wait and recurse
+    OutOfTime = timer:now_diff(os:timestamp(), T0) div 1000000 > TotalTime,
+    if OutOfTime -> ok; true ->
+        timer:sleep(RampDelay),
+        load_spawner(Opts, T0)
+    end.
+
+
+worker_loop(Opts, T0) ->
+    #{
+        dynamic_load := #{
+            load_per_pid := LoadPerPid,
+            total_time := TotalTime
+        }
+    } = Opts,
+
+    drain_msgs(),
+
+    % We can only sleep for a minimum of 1ms
+    {Msgs, Sleep} = case LoadPerPid > 1000 of
+        true -> {LoadPerPid div 1000, 1};
+        false -> {1, 1000 div LoadPerPid}
+    end,
+
+    generate_static_load(Msgs, Opts),
+
+    % If we have more time, wait and recurse
+    OutOfTime = timer:now_diff(os:timestamp(), T0) div 1000000 > TotalTime,
+    if OutOfTime -> ok; true ->
+        timer:sleep(Sleep),
+        worker_loop(Opts, T0)
+    end.
+
+
+drain_msgs() ->
+    receive
+        _ ->
+            drain_msgs()
+    after 0 ->
+        ok
+    end.
+
+
+monitor_dynamic(Opts, T0) ->
+    #{
+        dynamic_load := #{
+            load_per_pid := LoadPerPid,
+            total_time := TotalTime
+        }
+    } = Opts,
+
+    T2 = timer:now_diff(os:timestamp(), T0),
+    [{_, MQL}, {_, THS}, {_, GCI}] = process_info(whereis(ioq_server), [
+        message_queue_len,
+        total_heap_size,
+        garbage_collection_info
+    ]),
+    {_, RecentSize} = lists:keyfind(recent_size, 1, GCI),
+    NumWorkers = ets:info(?WORKERS, size),
+    Load = NumWorkers * LoadPerPid,
+    io:format("~.3f,~p,~p,~p,~p~n", [T2 / 1000000, Load, MQL, THS, RecentSize]),
+
+    OutOfTime = timer:now_diff(os:timestamp(), T0) div 1000000 > TotalTime,
+    if OutOfTime -> ok; true ->
+        timer:sleep(1000),
+        monitor_dynamic(Opts, T0)
+    end.
+
+
+create_files(#{num_files := NumFiles}) when NumFiles > 0 ->
+    create_files(NumFiles);
+
+create_files(NumFiles) when NumFiles > 0 ->
+    Pid = spawn(fun() -> file_loop() end),
+    ets:insert(?PIDS, {NumFiles - 1, Pid}),
+    create_files(NumFiles - 1);
+
+create_files(0) ->
+    ok.
+
+
+file_loop() ->
+    receive
+        {'$gen_call', From, _IoMsg} ->
+            io_wait(),
+            gen:reply(From, ok)
+    end,
+    file_loop().
+
+
+io_wait() ->
+    case rand:uniform() > 0.33 of
+        true ->
+            timer:sleep(1);
+        false ->
+            ok
+    end.
+
+
+pause_ioq_server() ->
+    erlang:suspend_process(whereis(ioq_server)).
+
+
+unpause_ioq_server() ->
+    erlang:resume_process(whereis(ioq_server)).
+
+
+generate_static_load(Opts) ->
+    #{
+        static_load_to_drain := Load
+    } = Opts,
+    generate_static_load(Load, Opts).
+
+
+generate_static_load(Load, Opts) when Load > 0 ->
+    #{
+        num_files := NumFiles,
+        active_users := Users,
+        class_perc := ClassPerc,
+        op_perc := OpPerc
+    } = Opts,
+
+    Id = trunc(rand:uniform() * NumFiles),
+    [{Id, Pid}] = ets:lookup(?PIDS, Id),
+    User = trunc(rand:uniform() * Users),
+    UserBin = integer_to_binary(User),
+    {IoPriority, IoMsg} = gen_priority_and_message(UserBin, ClassPerc, OpPerc),
+    do_call(Pid, IoMsg, IoPriority),
+    generate_static_load(Load - 1, Opts);
+
+generate_static_load(0, _) ->
+    ok.
+
+
+do_call(Pid, IoMsg, IoPriority) ->
+    {Class, Channel} = analyze_priority(IoPriority),
+    Request = #request{
+        fd = Pid,
+        msg = IoMsg,
+        channel = Channel,
+        class = Class,
+        t0 = erlang:monotonic_time()
+    },
+    % ioq_server:call(Pid, IoMsg, IoPriority).
+    Mref = erlang:make_ref(),
+    Msg = {'$gen_call', {self(), Mref}, Request},
+    erlang:send(whereis(ioq_server), Msg).
+
+
+wait_for_empty(T0, #{static_load_to_drain := Load}) ->
+    wait_for_empty_int(T0, Load).
+
+
+wait_for_empty_int(T0, Remain) when Remain > 0 ->
+    receive
+        _Msg ->
+            case (Remain rem 1000) == 0 of
+                true ->
+                    T2 = timer:now_diff(os:timestamp(), T0),
+                    {_, MQL} = process_info(whereis(ioq_server), message_queue_len),
+                    io:format("~.3f,~p,~p~n", [T2 / 1000000, Remain, MQL]);
+                false ->
+                    ok
+            end,
+            wait_for_empty_int(T0, Remain - 1)
+    after 1000 ->
+        T1 = timer:now_diff(os:timestamp(), T0),
+        {_, MQL} = process_info(whereis(ioq_server), message_queue_len),
+        io:format("~.3f,~p,~p~n", [T1 / 1000000, Remain, MQL]),
+        wait_for_empty_int(T0, Remain)
+    end;
+
+wait_for_empty_int(_T0, 0) ->
+    ok.
+
+
+gen_priority_and_message(UserBin, ClassPerc, OpPerc) ->
+    Class = map_choice(ClassPerc),
+    IoType = case Class of
+        customer ->
+            case map_choice(OpPerc) of
+                read ->
+                    interactive;
+                write ->
+                    db_update;
+                view ->
+                    view_update
+            end;
+        compaction ->
+            case rand:uniform() < 0.5 of
+                true -> db_compact;
+                false -> view_compact
+            end;
+        replication ->
+            internal_repl;
+        low ->
+            low
+    end,
+    IoMsg = case map_choice(OpPerc) of
+        read ->
+            {pread_iolist, rand:uniform()};
+        write ->
+            {append_binary, rand:uniform()};
+        view ->
+            case rand:uniform() < 0.6 of
+                true ->
+                    {pread_iolist, rand:uniform()};
+                false ->
+                    {append_binary, rand:uniform()}
+            end
+    end,
+    {{IoType, <<"shards/00000000-ffffffff/", UserBin/binary, "/foo">>}, IoMsg}.
+
+
+map_choice(#{} = Choices) ->
+    KVs = maps:to_list(Choices),
+    Sorted = lists:sort([{V, K} || {K, V} <- KVs]),
+    choose(Sorted, rand:uniform()).
+
+
+choose([{Perc, Item}], RandVal) when RandVal =< Perc ->
+    Item;
+choose([{Perc, Item} | _Rest], RandVal) when RandVal < Perc ->
+    Item;
+choose([{Perc, _Item} | Rest], RandVal) ->
+    choose(Rest, RandVal - Perc).
+
+% Copied from ioq_server
+analyze_priority({interactive, Shard}) ->
+    {interactive, channel_name(Shard)};
+analyze_priority({db_update, Shard}) ->
+    {db_update, channel_name(Shard)};
+analyze_priority({view_update, Shard, _GroupId}) ->
+    {view_update, channel_name(Shard)};
+analyze_priority({db_compact, _Shard}) ->
+    {db_compact, nil};
+analyze_priority({view_compact, _Shard, _GroupId}) ->
+    {view_compact, nil};
+analyze_priority({internal_repl, _Shard}) ->
+    {internal_repl, nil};
+analyze_priority({low, _Shard}) ->
+    {low, nil};
+analyze_priority(_Else) ->
+    {other, other}.
+
+channel_name(Shard) ->
+    try split(Shard) of
+    [<<"shards">>, _, <<"heroku">>, AppId | _] ->
+        <<AppId/binary, ".heroku">>;
+    [<<"shards">>, _, DbName] ->
+        ioq_kv:get({other, DbName}, other);
+    [<<"shards">>, _, Account, DbName] ->
+        ioq_kv:get({Account, DbName}, Account);
+    [<<"shards">>, _, Account | DbParts] ->
+        ioq_kv:get({Account, filename:join(DbParts)}, Account);
+    _ ->
+        other
+    catch _:_ ->
+        other
+    end.
+
+
+split(B) when is_binary(B) ->
+    split(B, 0, 0, []);
+split(B) -> B.
+
+split(B, O, S, Acc) ->
+    case B of
+    <<_:O/binary>> ->
+        Len = O - S,
+        <<_:S/binary, Part:Len/binary>> = B,
+        lists:reverse(Acc, [Part]);
+    <<_:O/binary, $/, _/binary>> ->
+        Len = O - S,
+        <<_:S/binary, Part:Len/binary, _/binary>> = B,
+        split(B, O+1, O+1, [Part | Acc]);
+    _ ->
+        split(B, O+1, S, Acc)
+    end.
\ No newline at end of file


[couchdb-ioq] 03/03: Use ioq_q in ioq_server

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch optimize-ioq-server
in repository https://gitbox.apache.org/repos/asf/couchdb-ioq.git

commit 396c77cdc1740876a46445eb40a7f08aa0bbc0b3
Author: Paul J. Davis <da...@us.ibm.com>
AuthorDate: Sun Jun 28 14:42:29 2020 -0500

    Use ioq_q in ioq_server
    
    I need to add request dedupes and backpressure and load shedding next.
    We can still overwhelm ioq_server when traffic spikes but its at least
    not as sudden.
---
 src/ioq_server.erl | 186 ++++++++++++++---------------------------------------
 1 file changed, 49 insertions(+), 137 deletions(-)

diff --git a/src/ioq_server.erl b/src/ioq_server.erl
index fce5071..1bce2d3 100644
--- a/src/ioq_server.erl
+++ b/src/ioq_server.erl
@@ -21,9 +21,9 @@
 
 -record(channel, {
     name,
-    qI = queue:new(), % client readers
-    qU = queue:new(), % db updater
-    qV = queue:new()  % view index updates
+    qI = ioq_q:new(), % client readers
+    qU = ioq_q:new(), % db updater
+    qV = ioq_q:new()  % view index updates
 }).
 
 -record(state, {
@@ -31,10 +31,11 @@
     histos,
     reqs = [],
     concurrency = 20,
-    channels = queue:new(),
-    qC = queue:new(), % compaction
-    qR = queue:new(), % internal replication
-    qL = queue:new(),
+    channels_by_name,
+    channels = ioq_q:new(),
+    qC = ioq_q:new(), % compaction
+    qR = ioq_q:new(), % internal replication
+    qL = ioq_q:new(),
     dedupe,
     class_priorities,
     op_priorities
@@ -51,14 +52,9 @@
     tsub
 }).
 
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% WARNING %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-%% This server relies on the internal structure of the channels queue as a   %%
-%% {list(), list()} to do in-place modifications of some elements.  We are   %%
-%% "running on thin ice", as it were.                                        %%
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
 start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+    Opts = [{spawn_opt, [{fullsweep_after, 10}]}],
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], Opts).
 
 call(Fd, Msg, Priority) ->
     {Class, Channel} = analyze_priority(Priority),
@@ -89,7 +85,10 @@ list_custom_channels() ->
     ioq_kv:all().
 
 init([]) ->
+    process_flag(message_queue_data, off_heap),
+    {ok, ChannelsByName} = khash:new(),
     State = #state{
+        channels_by_name = ChannelsByName,
         counters = ets:new(ioq_counters, []),
         histos = ets:new(ioq_histos, [named_table, ordered_set])
     },
@@ -110,12 +109,12 @@ handle_call(get_counters, _From, #state{counters = Tab} = State) ->
 
 handle_call(get_queue_depths, _From, State) ->
     Channels = lists:map(fun(#channel{name=N, qI=I, qU=U, qV=V}) ->
-        {N, [queue:len(I), queue:len(U), queue:len(V)]}
-    end, queue:to_list(State#state.channels)),
+        {N, [ioq_q:len(I), ioq_q:len(U), ioq_q:len(V)]}
+    end, ioq_q:to_list(State#state.channels)),
     Response = [
-        {compaction, queue:len(State#state.qC)},
-        {replication, queue:len(State#state.qR)},
-        {low, queue:len(State#state.qL)},
+        {compaction, ioq_q:len(State#state.qC)},
+        {replication, ioq_q:len(State#state.qR)},
+        {low, ioq_q:len(State#state.qL)},
         {channels, {Channels}}
     ],
     {reply, Response, State, 0};
@@ -144,11 +143,9 @@ handle_cast(_Msg, State) ->
 
 handle_info({Ref, Reply}, #state{reqs = Reqs} = State) ->
     case lists:keytake(Ref, #request.ref, Reqs) of
-    {value, #request{from=From} = Req, Reqs2} ->
-        TResponse = erlang:monotonic_time(),
+    {value, #request{from=From}, Reqs2} ->
         erlang:demonitor(Ref, [flush]),
         reply_to_all(From, Reply),
-        update_histograms(ioq_histos, Req, TResponse),
         {noreply, State#state{reqs = Reqs2}, 0};
     false ->
         {noreply, State, 0}
@@ -256,17 +253,18 @@ enqueue_request(#request{class = low} = Req, State) ->
 enqueue_request(Req, State) ->
     enqueue_channel(Req, State).
 
-find_channel(Account, {A, B}) ->
-    case lists:keyfind(Account, #channel.name, A) of
-    false ->
-        case lists:keyfind(Account, #channel.name, B) of
-        false ->
-            {new, #channel{name = Account}};
-        #channel{} = Channel ->
-            {2, Channel}
-        end;
-    #channel{} = Channel ->
-        {1, Channel}
+find_channel(Account, #state{} = State) ->
+    #state{
+        channels_by_name = ChannelsByName
+    } = State,
+    case khash:lookup(ChannelsByName, Account) of
+        {value, Channel} ->
+            couch_log:error("XKCD: found channel", []),
+            Channel;
+        not_found ->
+            Channel = #channel{name = Account},
+            khash:put(ChannelsByName, Account, Channel),
+            {new, Channel}
     end.
 
 update_channel(Ch, #request{class = view_update} = Req, Dedupe) ->
@@ -277,66 +275,20 @@ update_channel(Ch, Req, Dedupe) ->
     % everything else is interactive IO class
     Ch#channel{qI = update_queue(Req, Ch#channel.qI, Dedupe)}.
 
-update_queue(#request{from=From, fd=Fd, msg={pread_iolist, Pos}}=Req, Q, DD) ->
-    case maybe_dedupe(Fd, Pos, Q, DD) of
-    false ->
-        queue:in(Req, Q);
-    {Elem, N, #request{from=From1}=Match} ->
-        catch couch_stats:increment_counter([couchdb, io_queue, merged]),
-        Match1 = Match#request{from=append(From, From1)},
-        L = element(Elem, Q),
-        {H, [Match|T]} = lists:split(N, L),
-        setelement(Elem, Q, H ++ [Match1|T])
-    end;
 update_queue(Req, Q, _Dedupe) ->
-    queue:in(Req, Q).
-
-append(A, B) when is_list(B) ->
-    [A|B];
-append(A, B) ->
-    [A, B].
-
-maybe_dedupe(Fd, Pos, Q, Dedupe) ->
-    case Dedupe of
-        true -> matching_request(Fd, Pos, Q);
-        false -> false
-    end.
-
-matching_request(Fd, Pos, {A, B}) ->
-    case matching_request(Fd, Pos, A) of
-    false ->
-        case matching_request(Fd, Pos, B) of
-        false ->
-            false;
-        {N, Request} ->
-            {2, N, Request}
-        end;
-    {N, Request} ->
-        {1, N, Request}
-    end;
-matching_request(Fd, Pos, List) ->
-    matching_request(Fd, Pos, 0, List).
-
-matching_request(_Fd, _Pos, _N, []) ->
-    false;
-matching_request(Fd, Pos, N, [#request{fd=Fd, msg={pread_iolist, Pos}}=Req|_]) ->
-    {N, Req};
-matching_request(Fd, Pos, N, [_|Rest]) ->
-    matching_request(Fd, Pos, N + 1, Rest).
+    ioq_q:in(Req, Q).
 
 enqueue_channel(#request{channel=Account} = Req, #state{channels=Q} = State) ->
     DD = State#state.dedupe,
-    case find_channel(Account, Q) of
-    {new, Channel0} ->
-        State#state{channels = queue:in(update_channel(Channel0, Req, DD), Q)};
-    {Elem, Channel0} ->
-        Channel = update_channel(Channel0, Req, DD),
-        % the channel already exists in the queue - update it in place
-        L = element(Elem, Q),
-        NewL = lists:keyreplace(Account, #channel.name, L, Channel),
-        NewQ = setelement(Elem, Q, NewL),
-        State#state{channels = NewQ}
-    end.
+    % ioq_q's are update-in-place
+    case find_channel(Account, State) of
+        {new, Channel} ->
+            update_channel(Channel, Req, DD),
+            ioq_q:in(Channel, Q);
+        Channel ->
+            update_channel(Channel, Req, DD)
+    end,
+    State.
 
 maybe_submit_request(#state{concurrency=C,reqs=R} = St) when length(R) < C ->
     case make_next_request(St) of
@@ -367,6 +319,7 @@ sort_queues([{Q, Pri} | Rest], Norm, Choice, X, Skipped, Acc) ->
 
 make_next_request(State) ->
     #state{
+        channels_by_name = ChByName,
         channels = Ch,
         qC = QC,
         qR = QR,
@@ -387,13 +340,15 @@ make_next_request(State) ->
         % queue after we've extracted one here
         {Item2, [QI2, QU2, QV2]} =
             choose_next_request([QI, QU, QV], OpP),
-        case queue:is_empty(QU2) andalso
-             queue:is_empty(QI2) andalso
-             queue:is_empty(QV2) of
+        case ioq_q:is_empty(QU2) andalso
+             ioq_q:is_empty(QI2) andalso
+             ioq_q:is_empty(QV2) of
         true ->
+            % An empty channel needs to be removed from channels_by_name
+            khash:del(ChByName, Channel#channel.name),
             NewCh2 = NewCh;
         false ->
-            NewCh2 = queue:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh)
+            NewCh2 = ioq_q:in(Channel#channel{qI=QI2, qU=QU2, qV=QV2}, NewCh)
         end,
         submit_request(Item2, State#state{channels=NewCh2, qC=NewQC,
             qR=NewQR, qL=NewQL});
@@ -430,55 +385,12 @@ submit_request(Request, State) ->
 update_counter(Tab, Channel, IOClass, RW) ->
     upsert(Tab, {Channel, IOClass, RW}, 1).
 
-update_histograms(Tab, Req, TResponse) ->
-    #request{t0=T0, tsub=TSubmit, class=Class, channel=Channel, msg=Msg} = Req,
-    Delta1 = erlang:convert_time_unit(
-        TSubmit - T0, native, microsecond),
-    Delta2 = erlang:convert_time_unit(
-        TResponse - TSubmit, native, microsecond),
-    Bin1 = timebin(Delta1),
-    Bin2 = timebin(Delta2),
-    Bin3 = timebin(Delta1+Delta2),
-    if Channel =/= nil ->
-        upsert(Tab, {Channel, submit_delay, Bin1}, 1),
-        upsert(Tab, {Channel, svctm, Bin2}, 1),
-        upsert(Tab, {Channel, iowait, Bin3}, 1);
-    true -> ok end,
-    Key = make_key(Class, Msg),
-    upsert(Tab, {Key, submit_delay, Bin1}, 1),
-    upsert(Tab, {Key, svctm, Bin2}, 1),
-    upsert(Tab, {Key, iowait, Bin3}, 1).
-
-make_key(db_compact, _) ->
-    <<"compaction">>;
-make_key(view_compact, _) ->
-    <<"compaction">>;
-make_key(internal_repl, _) ->
-    <<"replication">>;
-make_key(low, _) ->
-    <<"low">>;
-make_key(view_update, _) ->
-    <<"views">>;
-make_key(db_update, _) ->
-    <<"writes">>;
-make_key(interactive, {pread_iolist, _}) ->
-    <<"reads">>;
-make_key(interactive, {append_bin, _}) ->
-    <<"writes">>;
-make_key(_, _) ->
-    <<"other">>.
-
 upsert(Tab, Key, Incr) ->
     try ets:update_counter(Tab, Key, Incr)
     catch error:badarg ->
         ets:insert(Tab, {Key, Incr})
     end.
 
-timebin(0) ->
-    0;
-timebin(V) ->
-    trunc(10*math:log10(V)).
-
 choose_next_request(Qs, Priorities) ->
     Norm = lists:sum(Priorities),
     QueuesAndPriorities = lists:zip(Qs, Priorities),
@@ -493,7 +405,7 @@ choose_prioritized_request(Qs) ->
 choose_prioritized_request([], Empties) ->
     {nil, lists:reverse(Empties)};
 choose_prioritized_request([Q | Rest], Empties) ->
-    case queue:out(Q) of
+    case ioq_q:out(Q) of
     {empty, _} ->
         choose_prioritized_request(Rest, [Q | Empties]);
     {{value, Item}, NewQ} ->