You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/11/13 05:33:56 UTC

[couchdb] 04/04: smoosh optimization wip

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch optimize-smoosh
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b7206c1f4b471b8fdfe7dc1e837fbeb508b6d5b1
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Tue Nov 8 14:30:22 2022 -0500

    smoosh optimization wip
---
 src/smoosh/src/smoosh_channel.erl | 689 ++++++++++++++++----------------------
 src/smoosh/src/smoosh_persist.erl |  11 +-
 src/smoosh/src/smoosh_server.erl  | 357 +++++++++++---------
 src/smoosh/test/smoosh_tests.erl  | 481 ++++++++++++++++++--------
 4 files changed, 854 insertions(+), 684 deletions(-)

diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl
index 50274f704..3ca515679 100644
--- a/src/smoosh/src/smoosh_channel.erl
+++ b/src/smoosh/src/smoosh_channel.erl
@@ -12,52 +12,50 @@
 
 -module(smoosh_channel).
 -behaviour(gen_server).
--vsn(1).
--include_lib("couch/include/couch_db.hrl").
 
 % public api.
--export([start_link/1, close/1, suspend/1, resume/1, activate/1, get_status/1]).
--export([enqueue/3, last_updated/2, flush/1, is_key/2, is_activated/1]).
+-export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
+-export([enqueue/3, flush/1]).
+-export([get_status_table/1]).
 
 % gen_server api.
 -export([
     init/1,
     handle_call/3,
     handle_cast/2,
-    handle_info/2,
-    terminate/2
+    handle_info/2
 ]).
 
--define(VSN, 1).
--define(CHECKPOINT_INTERVAL_IN_MSEC, 180000).
+-define(INDEX_CLEANUP, index_cleanup).
+-define(STATUS_UPDATE_INTERVAL_MSEC, 4900).
+-define(TIME_WINDOW_MSEC, 60 * 1000).
 
 -ifndef(TEST).
--define(START_DELAY_IN_MSEC, 60000).
--define(ACTIVATE_DELAY_IN_MSEC, 30000).
+-define(CHECKPOINT_INTERVAL_MSEC, 180000).
 -else.
--define(START_DELAY_IN_MSEC, 0).
--define(ACTIVATE_DELAY_IN_MSEC, 0).
--export([persist/1]).
+-define(CHECKPOINT_INTERVAL_MSEC, 1000).
 -endif.
 
-% records.
-
-% When the state is set to activated = true, the channel has completed the state
-% recovery process that occurs on (re)start and is accepting new compaction jobs.
-% Note: if activated = false and a request for a new compaction job is received,
-% smoosh will enqueue this new job after the state recovery process has finished.
-% When the state is set to paused = false, the channel is actively compacting any
-% compaction jobs that are scheduled.
-% See operator_guide.md --> State diagram.
+% If persistence is configured, on startup the channel will try to load its
+% waiting queue from a persisted queue data file. Then, during every
+% CHECKPOINT_INTERVAL_MSEC, it will spawn an checkpointer process to write the
+% persisted queue state to disk.
 
 -record(state, {
-    active = [],
+    % Channel name
     name,
+    % smoosh_persisted_queue:new() object
     waiting,
+    % Paused flag. The channel starts in a the paused state.
     paused = true,
-    starting = [],
-    activated = false,
-    requests = []
+    % #{Key => Pid}
+    active = #{},
+    % #{Ref => Key}
+    starting = #{},
+    % Monitor reference of the checkpointer process
+    cref,
+    % ETS status table handle. Used to publish channel status.
+    stab
 }).
 
 % public functions.
@@ -69,19 +67,19 @@ suspend(ServerRef) ->
     gen_server:call(ServerRef, suspend).
 
 resume(ServerRef) ->
-    gen_server:call(ServerRef, resume_and_activate).
-
-activate(ServerRef) ->
-    gen_server:call(ServerRef, activate).
+    gen_server:call(ServerRef, resume).
 
 enqueue(ServerRef, Object, Priority) ->
     gen_server:cast(ServerRef, {enqueue, Object, Priority}).
 
-last_updated(ServerRef, Object) ->
-    gen_server:call(ServerRef, {last_updated, Object}).
-
-get_status(ServerRef) ->
-    gen_server:call(ServerRef, status).
+get_status(StatusTab) when is_reference(StatusTab) ->
+    try ets:lookup(StatusTab, status) of
+        [{status, Status}] -> Status;
+        [] -> []
+    catch
+        error:badarg ->
+            []
+    end.
 
 close(ServerRef) ->
     gen_server:call(ServerRef, close).
@@ -89,137 +87,94 @@ close(ServerRef) ->
 flush(ServerRef) ->
     gen_server:call(ServerRef, flush).
 
-is_key(ServerRef, Key) ->
-    gen_server:call(ServerRef, {is_key, Key}).
-
-is_activated(ServerRef) ->
-    gen_server:call(ServerRef, is_activated).
-
-% Only exported to force persistence in tests
-persist(ServerRef) ->
-    gen_server:call(ServerRef, persist).
+get_status_table(ServerRef) ->
+    gen_server:call(ServerRef, get_status_table).
 
 % gen_server functions.
 
 init(Name) ->
-    erlang:send_after(60 * 1000, self(), check_window),
     process_flag(trap_exit, true),
-    Waiting = smoosh_priority_queue:new(Name),
-    Persist = config:get_boolean("smoosh", "persist", false),
-    State =
-        case smoosh_utils:is_view_channel(Name) orelse Persist =:= false of
-            true ->
-                schedule_unpause(),
-                #state{name = Name, waiting = Waiting, paused = true, activated = true};
-            false ->
-                erlang:send_after(?START_DELAY_IN_MSEC, self(), start_recovery),
-                #state{name = Name, waiting = Waiting, paused = true, activated = false}
-        end,
-    {ok, State}.
-
-handle_call({last_updated, Object}, _From, State) ->
-    LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting),
-    {reply, LastUpdated, State};
-handle_call(suspend, _From, State) ->
-    #state{active = Active} = State,
-    [
-        catch erlang:suspend_process(Pid, [unless_suspending])
-     || {_, Pid} <- Active
-    ],
+    process_flag(message_queue_data, off_heap),
+    schedule_check_window(),
+    schedule_update_status(),
+    schedule_checkpoint(),
+    schedule_unpause(),
+    State = #state{
+        name = Name,
+        waiting = smoosh_persist:unpersist(Name),
+        stab = ets:new(smoosh_stats, [{read_concurrency, true}])
+    },
+    {ok, set_status(State)}.
+
+handle_call(get_status_table, _From, #state{} = State) ->
+    {reply, {ok, State#state.stab}, State};
+handle_call(suspend, _From, #state{active = Active} = State) ->
+    [suspend_pid(Pid) || Pid <- maps:values(Active)],
     {reply, ok, State#state{paused = true}};
-handle_call(resume_and_activate, _From, State) ->
-    #state{active = Active} = State,
-    [catch erlang:resume_process(Pid) || {_, Pid} <- Active],
-    {reply, ok, State#state{paused = false, activated = true}};
-handle_call(activate, _From, State) ->
-    {reply, ok, State#state{activated = true}};
-handle_call(status, _From, State) ->
-    {reply,
-        {ok, [
-            {active, length(State#state.active)},
-            {starting, length(State#state.starting)},
-            {waiting, smoosh_priority_queue:info(State#state.waiting)}
-        ]},
-        State};
+handle_call(resume, _From, #state{active = Active} = State) ->
+    [resume_pid(Pid) || Pid <- maps:values(Active)],
+    {reply, ok, State#state{paused = false}};
 handle_call(close, _From, State) ->
     {stop, normal, ok, State};
 handle_call(flush, _From, #state{waiting = Q} = State) ->
-    {reply, ok, State#state{waiting = smoosh_priority_queue:flush(Q)}};
-handle_call({is_key, Key}, _From, #state{waiting = Waiting} = State) ->
-    {reply, smoosh_priority_queue:is_key(Key, Waiting), State};
-handle_call(is_activated, _From, #state{activated = Activated} = State0) ->
-    {reply, Activated, State0};
-handle_call(persist, _From, State) ->
-    persist_queue(State),
-    {reply, ok, State}.
-
-handle_cast({enqueue, _Object, 0}, #state{} = State) ->
-    {noreply, State};
-handle_cast({enqueue, Object, Priority}, #state{activated = true} = State) ->
-    {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))};
-handle_cast({enqueue, Object, Priority}, #state{activated = false, requests = Requests} = State0) ->
-    Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-    couch_log:Level(
-        "~p Channel is not activated yet. Adding ~p to requests with priority ~p.", [
-            ?MODULE,
-            Object,
-            Priority
-        ]
-    ),
-    {noreply, State0#state{requests = [{Object, Priority} | Requests]}}.
+    State1 = State#state{waiting = smoosh_priority_queue:flush(Q)},
+    smoosh_persist:persist(State1#state.waiting, #{}, #{}),
+    {reply, ok, State1}.
 
+handle_cast({enqueue, Object, Priority}, #state{} = State) ->
+    State1 = add_to_queue(Object, Priority, State),
+    {noreply, maybe_start_compaction(State1)}.
+
+handle_info({'DOWN', Ref, _, _, _}, #state{cref = Ref} = State) ->
+    {noreply, State#state{cref = undefined}};
 % We accept noproc here due to possibly having monitored a restarted compaction
 % pid after it finished.
-handle_info({'DOWN', Ref, _, Job, Reason}, State) when
+handle_info({'DOWN', Ref, _, Job, Reason}, #state{} = State) when
     Reason == normal;
     Reason == noproc
 ->
     #state{active = Active, starting = Starting} = State,
-    {noreply,
-        maybe_start_compaction(
-            State#state{
-                active = lists:keydelete(Job, 2, Active),
-                starting = lists:keydelete(Ref, 1, Starting)
-            }
-        )};
-handle_info({'DOWN', Ref, _, Job, Reason}, State) ->
-    #state{active = Active0, starting = Starting0} = State,
-    case lists:keytake(Job, 2, Active0) of
-        {value, {Key, _Pid}, Active1} ->
-            State1 = maybe_remonitor_cpid(
-                State#state{active = Active1},
-                Key,
-                Reason
-            ),
-            {noreply, maybe_start_compaction(State1)};
-        false ->
-            case lists:keytake(Ref, 1, Starting0) of
-                {value, {_, Key}, Starting1} ->
-                    couch_log:warning("failed to start compaction of ~p: ~p", [
-                        smoosh_utils:stringify(Key),
-                        Reason
-                    ]),
-                    {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
-                    {noreply, maybe_start_compaction(State#state{starting = Starting1})};
-                false ->
+    Active1 = maps:filter(fun(_, Pid) -> Pid =/= Job end, Active),
+    Starting1 = maps:remove(Ref, Starting),
+    State1 = State#state{active = Active1, starting = Starting1},
+    {noreply, maybe_start_compaction(State1)};
+handle_info({'DOWN', Ref, _, Job, Reason}, #state{} = State) ->
+    #state{name = Name, active = Active, starting = Starting} = State,
+    FoundActive = maps:filter(fun(_, Pid) -> Pid =:= Job end, Active),
+    case maps:to_list(FoundActive) of
+        [{Key, _Pid}] ->
+            Active1 = maps:without(Key, Active),
+            State1 = State#state{active = maps:without(Key, Active1)},
+            State2 = maybe_remonitor_cpid(State1, Key, Reason),
+            {noreply, maybe_start_compaction(State2)};
+        [] ->
+            case maps:take(Ref, Starting) of
+                {Key, Starting1} ->
+                    LogMsg = "~s : failed to start compaction of ~p: ~p",
+                    LogArgs = [Name, smoosh_utils:stringify(Key), Reason],
+                    couch_log:warning(LogMsg, LogArgs),
+                    re_enqueue(Key),
+                    State1 = State#state{starting = Starting1},
+                    {noreply, maybe_start_compaction(State1)};
+                error ->
                     {noreply, State}
             end
     end;
-handle_info({Ref, {ok, Pid}}, State) when is_reference(Ref) ->
-    case lists:keytake(Ref, 1, State#state.starting) of
-        {value, {_, Key}, Starting1} ->
+% This is the '$gen_call' response handling
+handle_info({Ref, {ok, Pid}}, #state{} = State) when is_reference(Ref) ->
+    #state{name = Name, active = Active, starting = Starting} = State,
+    case maps:take(Ref, Starting) of
+        {Key, Starting1} ->
             Level = smoosh_utils:log_level("compaction_log_level", "notice"),
-            couch_log:Level(
-                "~s: Started compaction for ~s",
-                [State#state.name, smoosh_utils:stringify(Key)]
-            ),
+            LogMsg = "~s: Started compaction for ~s",
+            LogArgs = [Name, smoosh_utils:stringify(Key)],
+            couch_log:Level(LogMsg, LogArgs),
             erlang:monitor(process, Pid),
             erlang:demonitor(Ref, [flush]),
-            {noreply, State#state{
-                active = [{Key, Pid} | State#state.active],
-                starting = Starting1
-            }};
-        false ->
+            Active1 = Active#{Key => Pid},
+            State1 = State#state{active = Active1, starting = Starting1},
+            {noreply, State1};
+        error ->
             {noreply, State}
     end;
 handle_info(check_window, State) ->
@@ -235,7 +190,7 @@ handle_info(check_window, State) ->
                 State;
             {false, true} ->
                 % resume is always safe even if we did not previously suspend
-                {reply, ok, NewState} = handle_call(resume_and_activate, nil, State),
+                {reply, ok, NewState} = handle_call(resume, nil, State),
                 NewState;
             {true, false} ->
                 if
@@ -246,234 +201,129 @@ handle_info(check_window, State) ->
                         State#state{paused = true}
                 end
         end,
-    erlang:send_after(60 * 1000, self(), check_window),
+    schedule_check_window(),
     {noreply, FinalState};
-handle_info(start_recovery, #state{name = Name, waiting = Waiting0} = State0) ->
-    RecActive = recover(active_file_name(Name)),
-    Waiting1 = lists:foldl(
-        fun(DbName, Acc) ->
-            case couch_db:exists(DbName) andalso couch_db:is_compacting(DbName) of
-                true ->
-                    Priority = smoosh_server:get_priority(Name, DbName),
-                    smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
-                false ->
-                    Acc
-            end
-        end,
-        Waiting0,
-        RecActive
-    ),
-    State1 = maybe_start_compaction(State0#state{paused = false, waiting = Waiting1}),
-    Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-    couch_log:Level(
-        "~p Previously active compaction jobs (if any) have been successfully recovered and restarted.",
-        [?MODULE]
-    ),
-    erlang:send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate),
-    {noreply, State1#state{paused = true}};
-handle_info(activate, State) ->
-    {noreply, activate_channel(State)};
-handle_info(persist, State) ->
-    ok = persist_and_reschedule(State),
+handle_info(update_status, #state{} = State) ->
+    schedule_update_status(),
+    {noreply, set_status(State)};
+handle_info(checkpoint, #state{cref = Ref} = State) when is_reference(Ref) ->
+    % If a checkpointer process is still running, don't start another one.
+    schedule_checkpoint(),
     {noreply, State};
+handle_info(checkpoint, #state{cref = undefined} = State) ->
+    % Start an asyncronous checkpoint process so we don't block the channel
+    #state{waiting = Waiting, active = Active, starting = Starting} = State,
+    Args = [Waiting, Active, Starting],
+    {_, Ref} = spawn_monitor(smoosh_persist, persist, Args),
+    schedule_checkpoint(),
+    {noreply, State#state{cref = Ref}};
 handle_info(pause, State) ->
     {noreply, State#state{paused = true}};
 handle_info(unpause, State) ->
     {noreply, maybe_start_compaction(State#state{paused = false})}.
 
-terminate(_Reason, _State) ->
-    ok.
-
 % private functions.
 
-recover(FilePath) ->
-    case do_recover(FilePath) of
-        {ok, List} ->
-            List;
-        error ->
-            []
-    end.
-
-do_recover(FilePath) ->
-    case file:read_file(FilePath) of
-        {ok, Content} ->
-            <<Vsn, Binary/binary>> = Content,
-            try parse_state(Vsn, ?VSN, Binary) of
-                Term ->
-                    Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-                    couch_log:Level(
-                        "~p Successfully restored state file ~s", [?MODULE, FilePath]
-                    ),
-                    {ok, Term}
-            catch
-                error:Reason ->
-                    couch_log:error(
-                        "~p Invalid state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
-                    ),
-                    file:delete(FilePath),
-                    error
-            end;
-        {error, enoent} ->
-            Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-            couch_log:Level(
-                "~p (~p) State file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]
-            ),
-            error;
-        {error, Reason} ->
-            couch_log:error(
-                "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
-            ),
-            file:delete(FilePath),
-            error
-    end.
-
-parse_state(1, ?VSN, Binary) ->
-    erlang:binary_to_term(Binary, [safe]);
-parse_state(Vsn, ?VSN, _) ->
-    error({unsupported_version, Vsn}).
-
-persist_and_reschedule(State) ->
-    persist_queue(State),
-    erlang:send_after(?CHECKPOINT_INTERVAL_IN_MSEC, self(), persist),
-    ok.
-
-persist_queue(#state{name = Name, active = Active, starting = Starting, waiting = Waiting}) ->
-    Active1 = lists:foldl(
-        fun({DbName, _}, Acc) ->
-            [DbName | Acc]
-        end,
-        [],
-        Active
-    ),
-    Starting1 = lists:foldl(
-        fun({_, DbName}, Acc) ->
-            [DbName | Acc]
-        end,
-        [],
-        Starting
-    ),
-    smoosh_utils:write_to_file(Active1, active_file_name(Name), ?VSN),
-    smoosh_utils:write_to_file(Starting1, starting_file_name(Name), ?VSN),
-    smoosh_priority_queue:write_to_file(Waiting).
-
-active_file_name(Name) ->
-    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active").
-
-starting_file_name(Name) ->
-    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting").
+% Periodically publish channel status to avoid having to block on gen_server
+% get_status calls.
+%
+set_status(#state{} = State) ->
+    #state{active = Active, starting = Starting, waiting = Waiting} = State,
+    Status = [
+        {active, map_size(Active)},
+        {starting, map_size(Starting)},
+        {waiting, smoosh_priority_queue:info(Waiting)}
+    ],
+    true = ets:insert(State#state.stab, {status, Status}),
+    State.
 
 add_to_queue(Key, Priority, State) ->
-    #state{active = Active, waiting = Q} = State,
-    case lists:keymember(Key, 1, Active) of
+    #state{name = Name, active = Active, waiting = Q} = State,
+    case is_map_key(Key, Active) of
         true ->
             State;
         false ->
-            Capacity = list_to_integer(smoosh_utils:get(State#state.name, "capacity", "9999")),
+            Capacity = smoosh_utils:capacity(State#state.name),
             Level = smoosh_utils:log_level("compaction_log_level", "notice"),
-            couch_log:Level(
-                "~s: adding ~p to internal compactor queue with priority ~p",
-                [State#state.name, Key, Priority]
-            ),
-            State#state{
-                waiting = smoosh_priority_queue:in(Key, Priority, Priority, Capacity, Q)
-            }
+            LogMsg = "~s: enqueueing ~p to compact with priority ~p",
+            LogArgs = [Name, Key, Priority],
+            couch_log:Level(LogMsg, LogArgs),
+            Q1 = smoosh_priority_queue:in(Key, Priority, Capacity, Q),
+            State#state{waiting = Q1}
     end.
 
-maybe_activate(#state{activated = true} = State) ->
+maybe_start_compaction(#state{paused = true} = State) ->
     State;
-maybe_activate(State) ->
-    activate_channel(State).
-
-activate_channel(#state{name = Name, waiting = Waiting0, requests = Requests0} = State0) ->
-    RecStarting = recover(starting_file_name(Name)),
-    Starting = lists:foldl(
-        fun(DbName, Acc) ->
-            case couch_db:exists(DbName) of
-                true ->
-                    Priority = smoosh_server:get_priority(Name, DbName),
-                    smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
-                false ->
-                    Acc
-            end
-        end,
-        Waiting0,
-        RecStarting
-    ),
-    Waiting1 = smoosh_priority_queue:recover(Starting),
-    Requests1 = lists:reverse(Requests0),
-    Waiting2 = lists:foldl(
-        fun({DbName, Priority}, Acc) ->
-            case couch_db:exists(DbName) of
-                true ->
-                    smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
-                false ->
-                    Acc
-            end
-        end,
-        Waiting1,
-        Requests1
-    ),
-    State1 = maybe_start_compaction(State0#state{
-        waiting = Waiting2, paused = false, activated = true, requests = []
-    }),
-    ok = persist_and_reschedule(State1),
-    schedule_unpause(),
-    State1#state{paused = true}.
+maybe_start_compaction(#state{paused = false, name = Name} = State) ->
+    Concurrency = smoosh_utils:concurrency(Name),
+    maybe_start_compaction(Concurrency, State).
 
-maybe_start_compaction(#state{paused = true} = State) ->
+maybe_start_compaction(Concurrency, #state{active = A, starting = S} = State) when
+    map_size(A) + map_size(S) >= Concurrency
+->
     State;
-maybe_start_compaction(State) ->
-    Concurrency = list_to_integer(
-        smoosh_utils:get(
-            State#state.name,
-            "concurrency",
-            "1"
-        )
-    ),
-    if
-        length(State#state.active) + length(State#state.starting) < Concurrency ->
-            case smoosh_priority_queue:out(State#state.waiting) of
-                false ->
-                    maybe_activate(State);
-                {Key, Priority, Q} ->
-                    try
-                        State2 =
-                            case start_compact(State, Key) of
-                                false ->
-                                    State;
-                                State1 ->
-                                    Level = smoosh_utils:log_level(
-                                        "compaction_log_level",
-                                        "notice"
-                                    ),
-                                    couch_log:Level(
-                                        "~s: Starting compaction for ~s (priority ~p)",
-                                        [State#state.name, smoosh_utils:stringify(Key), Priority]
-                                    ),
-                                    State1
-                            end,
-                        maybe_start_compaction(State2#state{waiting = Q})
-                    catch
-                        Class:Exception ->
-                            couch_log:warning(
-                                "~s: ~p ~p for ~s",
-                                [
-                                    State#state.name,
-                                    Class,
-                                    Exception,
-                                    smoosh_utils:stringify(Key)
-                                ]
-                            ),
-                            maybe_start_compaction(State#state{waiting = Q})
-                    end
-            end;
-        true ->
+maybe_start_compaction(Concurrency, #state{} = State) ->
+    case smoosh_priority_queue:out(State#state.waiting) of
+        false ->
+            State;
+        {Key, Q} ->
+            State1 = State#state{waiting = Q},
+            % Re-check priority since by the time the object was in queue, or
+            % was un-persisted after a node was down, the db or ddoc might be
+            % long gone and we don't want to crash the channel attemping to
+            % compact it
+            State2 =
+                case priority(State1, Key) of
+                    0 -> State1;
+                    _ -> try_compact(State1, Key)
+                end,
+            maybe_start_compaction(Concurrency, State2)
+    end.
+
+priority(#state{name = Name}, Key) ->
+    try
+        smoosh_server:get_priority(Name, Key)
+    catch
+        Tag:Error ->
+            % We are being defensive as we don't want to crash the channel
+            Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+            LogMsg = "~s: Failed to get priority for ~s in queue ~p:~p",
+            LogArgs = [Name, smoosh_utils:stringify(Key), Tag, Error],
+            couch_log:Level(LogMsg, LogArgs),
+            0
+    end.
+
+try_compact(#state{name = Name} = State, Key) ->
+    try start_compact(State, Key) of
+        false ->
+            State;
+        #state{} = State1 ->
+            Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+            LogMsg = "~s: Starting compaction for ~s",
+            LogArgs = [Name, smoosh_utils:stringify(Key)],
+            couch_log:Level(LogMsg, LogArgs),
+            State1
+    catch
+        Class:Exception ->
+            LogArgs = [Name, Class, Exception, smoosh_utils:stringify(Key)],
+            couch_log:warning("~s: ~p ~p for ~s", LogArgs),
             State
     end.
 
-start_compact(State, DbName) when is_list(DbName) ->
-    start_compact(State, ?l2b(DbName));
-start_compact(State, DbName) when is_binary(DbName) ->
+start_compact(#state{} = State, {?INDEX_CLEANUP, DbName} = Key) ->
+    #state{name = Name, active = Active} = State,
+    case smoosh_utils:ignore(DbName) of
+        false ->
+            {Pid, _Ref} = spawn_monitor(fun() -> cleanup_index_files(DbName) end),
+            Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+            LogMsg = "~s: Starting index cleanup for ~s",
+            LogArgs = [Name, smoosh_utils:stringify(Key)],
+            couch_log:Level(LogMsg, LogArgs),
+            State#state{active = Active#{Key => Pid}};
+        _ ->
+            false
+    end;
+start_compact(#state{name = Name} = State, DbName) when is_binary(DbName) ->
     case couch_db:open_int(DbName, []) of
         {ok, Db} ->
             try
@@ -482,94 +332,139 @@ start_compact(State, DbName) when is_binary(DbName) ->
                 couch_db:close(Db)
             end;
         Error = {not_found, no_db_file} ->
-            couch_log:warning(
-                "Error starting compaction for ~p: ~p",
-                [smoosh_utils:stringify(DbName), Error]
-            ),
+            LogMsg = "~s : Error starting compaction for ~p: ~p",
+            LogArgs = [Name, smoosh_utils:stringify(DbName), Error],
+            couch_log:warnign(LogMsg, LogArgs),
             false
     end;
-start_compact(State, {Shard, GroupId}) ->
+start_compact(#state{starting = Starting} = State, {Shard, GroupId}) ->
     case smoosh_utils:ignore_db({Shard, GroupId}) of
         false ->
-            DbName = mem3:dbname(Shard),
             {ok, Pid} = couch_index_server:get_index(
                 couch_mrview_index, Shard, GroupId
             ),
-            spawn(fun() -> cleanup_index_files(DbName, Shard) end),
+            schedule_cleanup_index_files(Shard),
             Ref = erlang:monitor(process, Pid),
             Pid ! {'$gen_call', {self(), Ref}, compact},
-            State#state{starting = [{Ref, {Shard, GroupId}} | State#state.starting]};
+            State#state{starting = Starting#{Ref => {Shard, GroupId}}};
         _ ->
             false
     end;
-start_compact(State, Db) ->
-    case smoosh_utils:ignore_db(Db) of
+start_compact(#state{} = State, Db) ->
+    #state{name = Name, starting = Starting, active = Active} = State,
+    Key = couch_db:name(Db),
+    case smoosh_utils:ignore_db(Key) of
         false ->
             DbPid = couch_db:get_pid(Db),
-            Key = couch_db:name(Db),
             case couch_db:get_compactor_pid(Db) of
                 nil ->
                     Ref = erlang:monitor(process, DbPid),
                     DbPid ! {'$gen_call', {self(), Ref}, start_compact},
-                    State#state{starting = [{Ref, Key} | State#state.starting]};
+                    State#state{starting = Starting#{Ref => Key}};
                 % Compaction is already running, so monitor existing compaction pid.
-                CPid ->
-                    Level = smoosh_utils:log_level("compaction_log_level", "notice"),
-                    couch_log:Level(
-                        "Db ~s continuing compaction",
-                        [smoosh_utils:stringify(Key)]
-                    ),
+                CPid when is_pid(CPid) ->
                     erlang:monitor(process, CPid),
-                    State#state{active = [{Key, CPid} | State#state.active]}
+                    Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+                    LogMsg = "~s : db ~s continuing compaction",
+                    LogArgs = [Name, smoosh_utils:stringify(Key)],
+                    couch_log:Level(LogMsg, LogArgs),
+                    State#state{active = Active#{Key => CPid}}
             end;
         _ ->
             false
     end.
 
-maybe_remonitor_cpid(State, DbName, Reason) when is_binary(DbName) ->
+maybe_remonitor_cpid(#state{} = State, DbName, Reason) when is_binary(DbName) ->
+    #state{name = Name, active = Active} = State,
     case couch_db:open_int(DbName, []) of
         {ok, Db} ->
             case couch_db:get_compactor_pid_sync(Db) of
                 nil ->
-                    couch_log:warning(
-                        "exit for compaction of ~p: ~p",
-                        [smoosh_utils:stringify(DbName), Reason]
-                    ),
-                    {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [DbName]),
+                    LogMsg = "~s : exit for compaction of ~p: ~p",
+                    LogArgs = [Name, smoosh_utils:stringify(DbName), Reason],
+                    couch_log:warning(LogMsg, LogArgs),
+                    re_enqueue(DbName),
                     State;
-                CPid ->
-                    Level = smoosh_utils:log_level("compaction_log_level", "notice"),
-                    couch_log:Level(
-                        "~s compaction already running. Re-monitor Pid ~p",
-                        [smoosh_utils:stringify(DbName), CPid]
-                    ),
+                CPid when is_pid(CPid) ->
                     erlang:monitor(process, CPid),
-                    State#state{active = [{DbName, CPid} | State#state.active]}
+                    Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+                    LogMsg = "~s: ~s compaction already running. Re-monitor Pid ~p",
+                    LogArgs = [Name, smoosh_utils:stringify(DbName), CPid],
+                    couch_log:Level(LogMsg, LogArgs),
+                    State#state{active = Active#{DbName => CPid}}
             end;
         Error = {not_found, no_db_file} ->
-            couch_log:warning(
-                "exit for compaction of ~p: ~p",
-                [smoosh_utils:stringify(DbName), Error]
-            ),
+            LogMsg = "~s : exit for compaction of ~p: ~p",
+            LogArgs = [Name, smoosh_utils:stringify(DbName), Error],
+            couch_log:warning(LogMsg, LogArgs),
             State
     end;
 % not a database compaction, so ignore the pid check
-maybe_remonitor_cpid(State, Key, Reason) ->
-    couch_log:warning(
-        "exit for compaction of ~p: ~p",
-        [smoosh_utils:stringify(Key), Reason]
-    ),
-    {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
+maybe_remonitor_cpid(#state{name = Name} = State, Key, Reason) ->
+    LogMsg = "~s: exit for compaction of ~p: ~p",
+    LogArgs = [Name, smoosh_utils:stringify(Key), Reason],
+    couch_log:warning(LogMsg, LogArgs),
+    re_enqueue(Key),
     State.
 
+schedule_check_window() ->
+    erlang:send_after(?TIME_WINDOW_MSEC, self(), check_window).
+
+schedule_update_status() ->
+    erlang:send_after(?STATUS_UPDATE_INTERVAL_MSEC, self(), update_status).
+
 schedule_unpause() ->
-    WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")),
+    WaitSecs = config:get_integer("smoosh", "wait_secs", 30),
     erlang:send_after(WaitSecs * 1000, self(), unpause).
 
-cleanup_index_files(DbName, _Shard) ->
-    case config:get("smoosh", "cleanup_index_files", "false") of
-        "true" ->
-            fabric:cleanup_index_files(DbName);
+schedule_checkpoint() ->
+    erlang:send_after(?CHECKPOINT_INTERVAL_MSEC, self(), checkpoint).
+
+re_enqueue(Obj) ->
+    case whereis(smoosh_server) of
+        Pid when is_pid(Pid) ->
+            erlang:send_after(5000, Pid, {'$gen_cast', enqueue, Obj}),
+            ok;
         _ ->
             ok
     end.
+
+cleanup_index_files(DbName) ->
+    case should_clean_up_indices() of
+        true -> fabric:cleanup_index_files(DbName);
+        false -> ok
+    end.
+
+schedule_cleanup_index_files(Shard) ->
+    case should_clean_up_indices() of
+        true ->
+            % Since cleanup is at the cluster level, schedule it with a chance
+            % inversely proportional to the number of local shards
+            DbName = mem3:dbname(Shard),
+            try length(mem3:local_shards(DbName)) of
+                ShardCount when ShardCount >= 1 ->
+                    case rand:uniform() < (1 / ShardCount) of
+                        true ->
+                            Arg = {?INDEX_CLEANUP, DbName},
+                            smoosh_server:enqueue(Arg);
+                        false ->
+                            ok
+                    end;
+                _ ->
+                    ok
+            catch
+                error:database_does_not_exist ->
+                    ok
+            end;
+        false ->
+            ok
+    end.
+
+should_clean_up_indices() ->
+    config:get_boolean("smoosh", "cleanup_index_files", true).
+
+suspend_pid(Pid) when is_pid(Pid) ->
+    catch erlang:suspend_process(Pid, [unless_suspending]).
+
+resume_pid(Pid) when is_pid(Pid) ->
+    catch erlang:resume_process(Pid).
diff --git a/src/smoosh/src/smoosh_persist.erl b/src/smoosh/src/smoosh_persist.erl
index 3b5418b04..b3ac1c43d 100644
--- a/src/smoosh/src/smoosh_persist.erl
+++ b/src/smoosh/src/smoosh_persist.erl
@@ -93,12 +93,12 @@ check_setup(true) ->
         ok -> ok;
         {error, Error2} -> throw({fail, "write", Error2})
     end,
-    couch_util:file_delete(Path).
+    delete_file(Path).
 
 write(#{} = QData, Path) when is_list(Path), map_size(QData) == 0 ->
     % Save a few bytes by deleting the persisted queue data if
     % there are no waiting/starting or active jobs
-    couch_util:file_delete(Path);
+    delete_file(Path);
 write(#{} = QData, Path) when is_list(Path) ->
     Bin = term_to_binary(QData, [compressed, {minor_version, 2}]),
     TmpPath = tmp_path(Path),
@@ -133,6 +133,13 @@ state_dir() ->
     Dir = config:get("smoosh", "state_dir", "."),
     filename:absname(Dir).
 
+delete_file(Path) ->
+    % On Erlang 24+ we can avoid using the file server
+    case erlang:function_exported(file, delete, 2) of
+        true -> file:delete(Path, [raw]);
+        false -> file:delete(Path)
+    end.
+
 -ifdef(TEST).
 
 -include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 27277be04..215277b42 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -12,27 +12,25 @@
 
 -module(smoosh_server).
 -behaviour(gen_server).
--vsn(4).
 -behaviour(config_listener).
--include_lib("couch/include/couch_db.hrl").
+%-include_lib("couch/include/couch_db.hrl").
 
 % public api.
 -export([
     start_link/0,
     suspend/0,
     resume/0,
+    flush/0,
     enqueue/1,
     sync_enqueue/1,
-    sync_enqueue/2,
     handle_db_event/3,
     status/0
 ]).
 
--define(SECONDS_PER_MINUTE, 60).
-
 % gen_server api.
 -export([
     init/1,
+    handle_continue/2,
     handle_call/3,
     handle_cast/2,
     handle_info/2,
@@ -46,30 +44,39 @@
 % exported but for internal use.
 -export([enqueue_request/2]).
 -export([get_priority/2]).
-
-% exported for testing and debugging
--export([get_channel/1]).
+-export([access_cleaner/0]).
 
 -ifdef(TEST).
+-define(STALENESS_MIN, 0).
+-define(ACCEESS_CLEAN_INTERVAL_MSEC, 300).
 -define(RELISTEN_DELAY, 50).
 -else.
+-define(STALENESS_MIN, 5).
+-define(ACCEESS_CLEAN_INTERVAL_MSEC, 3000).
 -define(RELISTEN_DELAY, 5000).
 -endif.
 
+-define(INDEX_CLEANUP, index_cleanup).
+-define(ACCESS, smoosh_access).
+-define(ACCESS_MAX_SIZE, 1000000).
+-define(ACCESS_NEVER, -1 bsl 58).
+
 % private records.
 
 -record(state, {
     db_channels = [],
     view_channels = [],
-    tab,
+    cleanup_channels = [],
     event_listener,
     waiting = #{},
-    waiting_by_ref = #{}
+    waiting_by_ref = #{},
+    access_cleaner
 }).
 
 -record(channel, {
     name,
-    pid
+    pid,
+    stab
 }).
 
 % public functions.
@@ -78,22 +85,35 @@ start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 suspend() ->
-    gen_server:call(?MODULE, suspend).
+    gen_server:call(?MODULE, suspend, infinity).
 
 resume() ->
-    gen_server:call(?MODULE, resume).
+    gen_server:call(?MODULE, resume, infinity).
 
-status() ->
-    gen_server:call(?MODULE, status).
+flush() ->
+    gen_server:call(?MODULE, flush, infinity).
 
-enqueue(Object) ->
-    gen_server:cast(?MODULE, {enqueue, Object}).
+status() ->
+    try ets:foldl(fun get_channel_status/2, [], ?MODULE) of
+        Res -> {ok, Res}
+    catch
+        error:badarg ->
+            {ok, []}
+    end.
 
-sync_enqueue(Object) ->
-    gen_server:call(?MODULE, {enqueue, Object}).
+enqueue(Object0) ->
+    Object = smoosh_utils:validate_arg(Object0),
+    case stale_enough(Object) of
+        true -> gen_server:cast(?MODULE, {enqueue, Object});
+        false -> ok
+    end.
 
-sync_enqueue(Object, Timeout) ->
-    gen_server:call(?MODULE, {enqueue, Object}, Timeout).
+sync_enqueue(Object0) ->
+    Object = smoosh_utils:validate_arg(Object0),
+    case stale_enough(Object) of
+        true -> gen_server:call(?MODULE, {enqueue, Object}, infinity);
+        false -> ok
+    end.
 
 handle_db_event(DbName, local_updated, St) ->
     enqueue(DbName),
@@ -110,35 +130,29 @@ handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) ->
 handle_db_event(_DbName, _Event, St) ->
     {ok, St}.
 
-% for testing and debugging only
-get_channel(ChannelName) ->
-    gen_server:call(?MODULE, {get_channel, ChannelName}).
-
 % gen_server functions.
 
 init([]) ->
     process_flag(trap_exit, true),
+    process_flag(message_queue_data, off_heap),
     ok = config:listen_for_changes(?MODULE, nil),
-    {ok, Pid} = start_event_listener(),
-    DbChannels = smoosh_utils:split(
-        config:get("smoosh", "db_channels", "upgrade_dbs,ratio_dbs,slack_dbs")
-    ),
-    ViewChannels = smoosh_utils:split(
-        config:get("smoosh", "view_channels", "upgrade_views,ratio_views,slack_views")
-    ),
-    Tab = ets:new(channels, [{keypos, #channel.name}]),
-    {ok,
-        create_missing_channels(#state{
-            db_channels = DbChannels,
-            view_channels = ViewChannels,
-            event_listener = Pid,
-            tab = Tab
-        })}.
-
-handle_config_change("smoosh", "db_channels", L, _, _) ->
-    {ok, gen_server:cast(?MODULE, {new_db_channels, smoosh_utils:split(L)})};
-handle_config_change("smoosh", "view_channels", L, _, _) ->
-    {ok, gen_server:cast(?MODULE, {new_view_channels, smoosh_utils:split(L)})};
+    Opts = [named_table, {read_concurrency, true}],
+    ets:new(?MODULE, Opts ++ [{keypos, #channel.name}]),
+    ets:new(?ACCESS, Opts ++ [{write_concurrency, true}, public]),
+    State = #state{
+        access_cleaner = spawn_link(?MODULE, access_cleaner, []),
+        db_channels = smoosh_utils:db_channels(),
+        view_channels = smoosh_utils:view_channels(),
+        cleanup_channels = smoosh_utils:cleanup_channels()
+    },
+    {ok, State, {continue, create_channels}}.
+
+handle_config_change("smoosh", "db_channels", _, _, _) ->
+    {ok, gen_server:cast(?MODULE, new_db_channels)};
+handle_config_change("smoosh", "view_channels", _, _, _) ->
+    {ok, gen_server:cast(?MODULE, new_view_channels)};
+handle_config_change("smoosh", "cleanup_channels", _, _, _) ->
+    {ok, gen_server:cast(?MODULE, new_cleanup_channels)};
 handle_config_change(_, _, _, _, _) ->
     {ok, nil}.
 
@@ -151,51 +165,58 @@ handle_config_terminate(_Server, _Reason, _State) ->
         restart_config_listener
     ).
 
-handle_call(status, _From, State) ->
-    Acc = ets:foldl(fun get_channel_status/2, [], State#state.tab),
-    {reply, {ok, Acc}, State};
+handle_continue(create_channels, #state{} = State) ->
+    % Warn users about smoosh persistence misconfiguration issues. Do it once
+    % on startup to avoid continuously spamming logs with errors.
+    smoosh_persist:check_setup(),
+    State1 = create_missing_channels(State),
+    {ok, Pid} = start_event_listener(),
+    {noreply, State1#state{event_listener = Pid}}.
+
 handle_call({enqueue, Object}, _From, State) ->
     {noreply, NewState} = handle_cast({enqueue, Object}, State),
     {reply, ok, NewState};
 handle_call(suspend, _From, State) ->
-    ets:foldl(
-        fun(#channel{name = Name, pid = P}, _) ->
-            Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-            couch_log:Level("Suspending ~p", [Name]),
-            smoosh_channel:suspend(P)
-        end,
-        0,
-        State#state.tab
-    ),
+    Fun = fun(#channel{name = Name, pid = P}, _) ->
+        Level = smoosh_utils:log_level("compaction_log_level", "debug"),
+        couch_log:Level("Suspending ~p", [Name]),
+        smoosh_channel:suspend(P)
+    end,
+    ets:foldl(Fun, ok, ?MODULE),
     {reply, ok, State};
 handle_call(resume, _From, State) ->
-    ets:foldl(
-        fun(#channel{name = Name, pid = P}, _) ->
-            Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-            couch_log:Level("Resuming ~p", [Name]),
-            smoosh_channel:resume(P)
-        end,
-        0,
-        State#state.tab
-    ),
+    Fun = fun(#channel{name = Name, pid = P}, _) ->
+        Level = smoosh_utils:log_level("compaction_log_level", "debug"),
+        couch_log:Level("Resuming ~p", [Name]),
+        smoosh_channel:resume(P)
+    end,
+    ets:foldl(Fun, ok, ?MODULE),
     {reply, ok, State};
-handle_call({get_channel, ChannelName}, _From, #state{tab = Tab} = State) ->
-    {reply, {ok, channel_pid(Tab, ChannelName)}, State}.
-
-handle_cast({new_db_channels, Channels}, State) ->
-    [
-        smoosh_channel:close(channel_pid(State#state.tab, C))
-     || C <- State#state.db_channels -- Channels
-    ],
-    {noreply, create_missing_channels(State#state{db_channels = Channels})};
-handle_cast({new_view_channels, Channels}, State) ->
-    [
-        smoosh_channel:close(channel_pid(State#state.tab, C))
-     || C <- State#state.view_channels -- Channels
-    ],
-    {noreply, create_missing_channels(State#state{view_channels = Channels})};
+handle_call(flush, _From, State) ->
+    Fun = fun(#channel{pid = P}, _) -> smoosh_channel:flush(P) end,
+    ets:foldl(Fun, ok, ?MODULE),
+    {reply, ok, State}.
+
+handle_cast(new_db_channels, #state{} = State) ->
+    Channels = smoosh_utils:db_channels(),
+    Closed = State#state.db_channels -- Channels,
+    [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+    State1 = State#state{db_channels = Channels},
+    {noreply, create_missing_channels(State1)};
+handle_cast(new_view_channels, #state{} = State) ->
+    Channels = smoosh_utils:view_channels(),
+    Closed = State#state.view_channels -- Channels,
+    [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+    State1 = State#state{view_channels = Channels},
+    {noreply, create_missing_channels(State1)};
+handle_cast(new_cleanup_channels, #state{} = State) ->
+    Channels = smoosh_utils:cleanup_channels(),
+    Closed = State#state.cleanup_channels -- Channels,
+    [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+    State1 = State#state{cleanup_channels = Channels},
+    {noreply, create_missing_channels(State1)};
 handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) ->
-    case maps:is_key(Object, Waiting) of
+    case is_map_key(Object, Waiting) of
         true ->
             {noreply, State};
         false ->
@@ -208,12 +229,17 @@ handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
     couch_log:Level("update notifier died ~p", [Reason]),
     {ok, Pid1} = start_event_listener(),
     {noreply, State#state{event_listener = Pid1}};
+handle_info({'EXIT', Pid, Reason}, #state{access_cleaner = Pid} = State) ->
+    Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+    couch_log:Level("access cleaner died ~p", [Reason]),
+    Pid1 = spawn_link(?MODULE, access_cleaner, []),
+    {noreply, State#state{access_cleaner = Pid1}};
 handle_info({'EXIT', Pid, Reason}, State) ->
     Level = smoosh_utils:log_level("compaction_log_level", "notice"),
     couch_log:Level("~p ~p died ~p", [?MODULE, Pid, Reason]),
-    case ets:match_object(State#state.tab, #channel{pid = Pid, _ = '_'}) of
+    case ets:match_object(?MODULE, #channel{pid = Pid, _ = '_'}) of
         [#channel{name = Name}] ->
-            ets:delete(State#state.tab, Name);
+            ets:delete(?MODULE, Name);
         _ ->
             ok
     end,
@@ -226,13 +252,13 @@ handle_info(restart_config_listener, State) ->
 handle_info(_Msg, State) ->
     {noreply, State}.
 
-terminate(_Reason, State) ->
-    ets:foldl(
-        fun(#channel{pid = P}, _) -> smoosh_channel:close(P) end,
-        0,
-        State#state.tab
-    ),
-    ok.
+terminate(_Reason, #state{access_cleaner = CPid}) ->
+    catch unlink(CPid),
+    exit(CPid, kill),
+    Fun = fun(#channel{pid = P}, _) ->
+        smoosh_channel:close(P)
+    end,
+    ets:foldl(Fun, ok, ?MODULE).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -251,18 +277,11 @@ remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) ->
     {Ref, Waiting1} = maps:take(Object, Waiting),
     State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
 
-get_channel_status(#channel{name = Name, pid = P}, Acc0) when is_pid(P) ->
-    try gen_server:call(P, status) of
-        {ok, Status} ->
-            [{Name, Status} | Acc0];
-        _ ->
-            Acc0
-    catch
-        _:_ ->
-            Acc0
-    end;
-get_channel_status(_, Acc0) ->
-    Acc0.
+get_channel_status(#channel{name = Name, stab = Tab}, Acc) ->
+    Status = smoosh_channel:get_status(Tab),
+    [{Name, Status} | Acc];
+get_channel_status(_, Acc) ->
+    Acc.
 
 start_event_listener() ->
     couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]).
@@ -273,79 +292,92 @@ enqueue_request(State, Object) ->
             false ->
                 ok;
             {ok, Pid, Priority} ->
+                case ets:info(?ACCESS, size) of
+                    Size when Size =< ?ACCESS_MAX_SIZE ->
+                        Now = erlang:monotonic_time(second),
+                        true = ets:insert(?ACCESS, {Object, Now});
+                    _ ->
+                        ok
+                end,
                 smoosh_channel:enqueue(Pid, Object, Priority)
         end
     catch
-        Class:Exception:Stack ->
-            couch_log:warning(
-                "~s: ~p ~p for ~s : ~p",
-                [
-                    ?MODULE,
-                    Class,
-                    Exception,
-                    smoosh_utils:stringify(Object),
-                    Stack
-                ]
-            )
+        Tag:Exception:Stack ->
+            Args = [?MODULE, Tag, Exception, smoosh_utils:stringify(Object), Stack],
+            couch_log:warning("~s: ~p ~p for ~s : ~p", Args),
+            ok
     end.
 
-find_channel(#state{} = State, {Shard, GroupId}) ->
-    find_channel(State#state.tab, State#state.view_channels, {Shard, GroupId});
+find_channel(#state{} = State, {?INDEX_CLEANUP, DbName}) ->
+    find_channel(State, State#state.cleanup_channels, {?INDEX_CLEANUP, DbName});
+find_channel(#state{} = State, {Shard, GroupId}) when is_binary(Shard) ->
+    find_channel(State, State#state.view_channels, {Shard, GroupId});
 find_channel(#state{} = State, DbName) ->
-    find_channel(State#state.tab, State#state.db_channels, DbName).
+    find_channel(State, State#state.db_channels, DbName).
 
-find_channel(_Tab, [], _Object) ->
+find_channel(#state{} = _State, [], _Object) ->
     false;
-find_channel(Tab, [Channel | Rest], Object) ->
-    Pid = channel_pid(Tab, Channel),
-    LastUpdated = smoosh_channel:last_updated(Pid, Object),
-    StalenessInSec =
-        config:get_integer("smoosh", "staleness", 5) *
-            ?SECONDS_PER_MINUTE,
-    Staleness = erlang:convert_time_unit(StalenessInSec, seconds, native),
-    Now = erlang:monotonic_time(),
-    Activated = smoosh_channel:is_activated(Pid),
-    StaleEnough = LastUpdated =:= false orelse Now - LastUpdated > Staleness,
-    case Activated andalso StaleEnough of
+find_channel(#state{} = State, [Channel | Rest], Object) ->
+    case stale_enough(Object) of
         true ->
             case smoosh_utils:ignore_db(Object) of
                 true ->
-                    find_channel(Tab, Rest, Object);
+                    find_channel(State, Rest, Object);
                 _ ->
                     case get_priority(Channel, Object) of
                         0 ->
-                            find_channel(Tab, Rest, Object);
+                            find_channel(State, Rest, Object);
                         Priority ->
-                            {ok, Pid, Priority}
+                            {ok, channel_pid(Channel), Priority}
                     end
             end;
         false ->
-            find_channel(Tab, Rest, Object)
+            find_channel(State, Rest, Object)
     end.
 
-channel_pid(Tab, Channel) ->
-    [#channel{pid = Pid}] = ets:lookup(Tab, Channel),
+stale_enough(Object) ->
+    LastUpdatedSec = last_updated(Object),
+    Staleness = erlang:monotonic_time(second) - LastUpdatedSec,
+    Staleness > min_staleness_sec().
+
+channel_pid(Channel) ->
+    [#channel{pid = Pid}] = ets:lookup(?MODULE, Channel),
     Pid.
 
-create_missing_channels(State) ->
-    create_missing_channels(State#state.tab, State#state.db_channels),
-    create_missing_channels(State#state.tab, State#state.view_channels),
+create_missing_channels(#state{} = State) ->
+    create_missing_channels_type(State#state.db_channels),
+    create_missing_channels_type(State#state.view_channels),
+    create_missing_channels_type(State#state.cleanup_channels),
     State.
 
-create_missing_channels(_Tab, []) ->
+create_missing_channels_type([]) ->
     ok;
-create_missing_channels(Tab, [Channel | Rest]) ->
-    case ets:lookup(Tab, Channel) of
+create_missing_channels_type([Channel | Rest]) ->
+    case ets:lookup(?MODULE, Channel) of
         [] ->
             {ok, Pid} = smoosh_channel:start_link(Channel),
-            true = ets:insert(Tab, [#channel{name = Channel, pid = Pid}]);
+            {ok, STab} = smoosh_channel:get_status_table(Pid),
+            Chan = #channel{
+                name = Channel,
+                pid = Pid,
+                stab = STab
+            },
+            true = ets:insert(?MODULE, Chan);
         _ ->
             ok
     end,
-    create_missing_channels(Tab, Rest).
+    create_missing_channels_type(Rest).
 
+get_priority(_Channel, {?INDEX_CLEANUP, DbName}) ->
+    try mem3:local_shards(mem3:dbname(DbName)) of
+        [_ | _] -> 1;
+        [] -> 0
+    catch
+        error:database_does_not_exist ->
+            0
+    end;
 get_priority(Channel, {Shard, GroupId}) ->
-    case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
+    try couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
         {ok, Pid} ->
             try
                 {ok, ViewInfo} = couch_index:get_info(Pid),
@@ -368,9 +400,10 @@ get_priority(Channel, {Shard, GroupId}) ->
                 [Channel, Shard, GroupId, Reason]
             ),
             0
+    catch
+        throw:{not_found, _} ->
+            0
     end;
-get_priority(Channel, DbName) when is_list(DbName) ->
-    get_priority(Channel, ?l2b(DbName));
 get_priority(Channel, DbName) when is_binary(DbName) ->
     case couch_db:open_int(DbName, []) of
         {ok, Db} ->
@@ -379,11 +412,8 @@ get_priority(Channel, DbName) when is_binary(DbName) ->
             after
                 couch_db:close(Db)
             end;
-        Error = {not_found, no_db_file} ->
-            couch_log:warning(
-                "~p: Error getting priority for ~p: ~p",
-                [Channel, DbName, Error]
-            ),
+        {not_found, no_db_file} ->
+            % It's expected that a db might be deleted while waiting in queue
             0
     end;
 get_priority(Channel, Db) ->
@@ -494,6 +524,31 @@ view_needs_upgrade(Props) ->
             Enabled andalso length(Versions) >= 2
     end.
 
+access_cleaner() ->
+    JitterMSec = rand:uniform(?ACCEESS_CLEAN_INTERVAL_MSEC),
+    timer:sleep(?ACCEESS_CLEAN_INTERVAL_MSEC + JitterMSec),
+    NowSec = erlang:monotonic_time(second),
+    Limit = NowSec - min_staleness_sec(),
+    Head = {'_', '$1'},
+    Guard = {'<', '$1', Limit},
+    ets:select_delete(?ACCESS, [{Head, [Guard], [true]}]),
+    access_cleaner().
+
+min_staleness_sec() ->
+    Min = config:get_integer("smoosh", "staleness", ?STALENESS_MIN),
+    Min * 60.
+
+last_updated(Object) ->
+    try ets:lookup(?ACCESS, Object) of
+        [{_, AccessSec}] ->
+            AccessSec;
+        [] ->
+            ?ACCESS_NEVER
+    catch
+        error:badarg ->
+            0
+    end.
+
 -ifdef(TEST).
 
 -include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl
index 73876888b..ee1d66992 100644
--- a/src/smoosh/test/smoosh_tests.erl
+++ b/src/smoosh/test/smoosh_tests.erl
@@ -3,152 +3,365 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
-%% ==========
-%% Setup
-%% ----------
+smoosh_test_() ->
+    {
+        setup,
+        fun setup_all/0,
+        fun teardown_all/1,
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [
+                %% ?TDEF_FE(t_default_channels),
+                %% ?TDEF_FE(t_channels_recreated_on_crash),
+                %% ?TDEF_FE(t_can_create_and_delete_channels),
+                ?TDEF_FE(t_db_is_enqueued, 20),
+                ?TDEF_FE(t_x)
+            ]
+        }
+    }.
+
+setup_all() ->
+    meck:new(smoosh_server, [passthrough]),
+    meck:new(smoosh_channel, [passthrough]),
+    Ctx = test_util:start_couch([fabric]),
+    config:set("query_server_config", "commit_freq", "0", false),
+    Ctx.
+
+teardown_all(Ctx) ->
+    catch application:stop(smoosh),
+    config:delete("query_server_config", "commit_freq", false),
+    test_util:stop(Ctx),
+    meck:unload().
 
-setup(ChannelType) ->
+setup() ->
+    config:set("smoosh", "persist", "true", false),
+    config:set("smoosh.ratio_dbs", "min_size", "1", false),
+    config:set("smoosh.ratio_dbs", "min_priority", "1", false),
     DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
-    couch_db:close(Db),
-    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
-    smoosh_channel:flush(ChannelPid),
-    ok = config:set("smoosh", "persist", "true", false),
-    ok = config:set(config_section(ChannelType), "min_size", "1", false),
-    ok = config:set(config_section(ChannelType), "min_priority", "1", false),
+    fabric:create_db(DbName, [{q, 1}]),
+    {ok, _} = create_ddoc(DbName, <<"_design/foo">>, <<"bar">>),
+    {ok, _} = create_doc(DbName, <<"doc1">>, 10000000),
+    {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+    %{ok, _} = update_ddoc(DbName, <<"_design/foo">>, <<"bar">>),
+    %{ok, _} = delete_doc(DbName, <<"doc1">>),
+    %{ok, _} = fabric:query_view(DbName, <<"boo">>, <<"bar">>),
+    application:start(smoosh),
     DbName.
 
-teardown(ChannelType, DbName) ->
-    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
-    ok = config:delete("smoosh", "persist", false),
-    ok = config:delete(config_section(DbName), "min_size", false),
-    ok = config:delete(config_section(DbName), "min_priority", false),
-    meck:unload(),
-    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
-    smoosh_channel:flush(ChannelPid),
+teardown(DbName) ->
+    flush(),
+    application:stop(smoosh),
+    fabric:delete_db(DbName),
+    meck:reset(smoosh_server),
+    meck:reset(smoosh_channel),
+    config:delete("smoosh", "db_channels", false),
+    config:delete("smoosh", "view_channels", false),
+    config:delete("smoosh", "cleanup_channels", false),
+    config:delete("smoosh", "persist", false),
+    config:delete("smoosh.ratio_dbs", "min_size", false),
+    config:delete("smoosh.ratio_dbs", "min_priority", false).
+
+t_default_channels(_) ->
+    % 3 default ratios + 3 default views + 1 index cleanup = 7
+    wait_for_channels(7),
+    ?assertMatch(
+        [
+            {"index_cleanup", _},
+            {"ratio_dbs", _},
+            {"ratio_views", _},
+            {"slack_dbs", _},
+            {"slack_views", _},
+            {"upgrade_dbs", _},
+            {"upgrade_views", _}
+        ],
+        status()
+    ).
+
+t_channels_recreated_on_crash(_) ->
+    wait_for_channels(7),
+    flush(),
+    RatioDbsPid = get_channel_pid("ratio_dbs"),
+    meck:reset(smoosh_channel),
+    exit(RatioDbsPid, kill),
+    meck:wait(1, smoosh_channel, start_link, 1, 3000),
+    wait_for_channels(7),
+    ?assertMatch([_, {"ratio_dbs", _} | _], status()),
+    ?assertNotEqual(RatioDbsPid, get_channel_pid("ratio_dbs")).
+
+t_can_create_and_delete_channels(_) ->
+    wait_for_channels(7),
+    flush(),
+    config:set("smoosh", "db_channels", "mychan1", false),
+    config:set("smoosh", "view_channels", "mychan2", false),
+    config:set("smoosh", "cleanup_channels", "mychan3", false),
+    wait_for_channels(10),
+    meck:reset(smoosh_channel),
+    config:delete("smoosh", "db_channels", false),
+    config:delete("smoosh", "view_channels", false),
+    config:delete("smoosh", "cleanup_channels", false),
+    wait_for_channels(7).
+
+t_db_is_enqueued(DbName) ->
+    wait_for_channels(7),
+    flush(),
+    meck:reset(smoosh_channel),
+    ?debugFmt("~n DELETING doc1: ~p ~n", [fabric:get_db_info(DbName)]),
+    timer:sleep(2000),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ?debugFmt("~n DELETED doc1: ~p ~n", [fabric:get_db_info(DbName)]),
+    ok = wait_to_enqueue(DbName),
+    %ok = wait_compact_start(),
+    ok = wait_compact_done(),
+    timer:sleep(4000),
+    ?debugFmt("~n COMPACTED doc1: ~p ~n", [fabric:get_db_info(DbName)]),
+    ?debugVal(meck:capture(last, smoosh_channel, handle_info, 2, 1)),
     ok.
 
-config_section(ChannelType) ->
-    "smoosh." ++ ChannelType.
+t_x(DbName) ->
+    ok.
+
+%% t_smoosh2(DbName) ->
+%%     io:format(standard_error, "~n +++++ ~p:~p@~B 2~n", [?MODULE, ?FUNCTION_NAME, ?LINE]),
+%%     ?debugVal(DbName).
 
 %% ==========
-%% Tests
+%% Setup
 %% ----------
 
-smoosh_test_() ->
-    {
-        "Testing smoosh",
-        {
-            setup,
-            fun() -> test_util:start_couch([smoosh]) end,
-            fun test_util:stop/1,
-            [
-                channels_tests(),
-                persistence_tests()
-            ]
-        }
-    }.
+%% setup(ChannelType) ->
+%%     DbName = ?tempdb(),
+%%     {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+%%     couch_db:close(Db),
+%%     {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+%%     smoosh_channel:flush(ChannelPid),
+%%     ok = config:set("smoosh", "persist", "true", false),
+%%     ok = config:set("smoosh.ratio_dbs", "min_size", "1", false),
+%%     ok = config:set("smoosh.ratio_dbs", "min_priority", "1", false),
+%%     DbName.
 
-persistence_tests() ->
-    Tests = [
-        fun should_persist_queue/2,
-        fun should_call_recover/2,
-        fun should_not_call_recover/2
-    ],
-    {
-        "Various persistence tests",
-        [
-            make_test_case("ratio_dbs", Tests)
-        ]
-    }.
+%% teardown(ChannelType, DbName) ->
+%%     ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+%%     ok = config:delete("smoosh", "persist", false),
+%%     ok = config:delete(config_section(DbName), "min_size", false),
+%%     ok = config:delete(config_section(DbName), "min_priority", false),
+%%     meck:unload(),
+%%     {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+%%     smoosh_channel:flush(ChannelPid),
+%%     ok.
 
-channels_tests() ->
-    Tests = [
-        fun should_enqueue/2
-    ],
-    {
-        "Various channels tests",
-        [
-            make_test_case("ratio_dbs", Tests)
-        ]
-    }.
+%% config_section(ChannelType) ->
+%%     "smoosh." ++ ChannelType.
 
-make_test_case(Type, Funs) ->
-    {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}.
-
-should_enqueue(ChannelType, DbName) ->
-    ?_test(begin
-        ok = grow_db_file(DbName, 300),
-        ok = wait_enqueue(ChannelType, DbName),
-        ?assert(is_enqueued(ChannelType, DbName)),
-        ok
-    end).
-
-should_persist_queue(ChannelType, DbName) ->
-    ?_test(begin
-        {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
-        ok = grow_db_file(DbName, 300),
-        ok = wait_enqueue(ChannelType, DbName),
-        ok = smoosh_channel:persist(ChannelPid),
-        Q0 = channel_queue(ChannelType),
-        ok = application:stop(smoosh),
-        ok = application:start(smoosh),
-        Q1 = channel_queue(ChannelType),
-        % Assert that queues are not empty
-        ?assertNotEqual(Q0, smoosh_priority_queue:new(ChannelType)),
-        ?assertNotEqual(Q1, smoosh_priority_queue:new(ChannelType)),
-        ?assertEqual(Q0, Q1),
-        ok
-    end).
-
-should_call_recover(_ChannelType, _DbName) ->
-    ?_test(begin
-        ok = application:stop(smoosh),
-        ok = config:set("smoosh", "persist", "true", false),
-        meck:new(smoosh_priority_queue, [passthrough]),
-        ok = application:start(smoosh),
-        timer:sleep(1000),
-        ?assertNotEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
-        ok
-    end).
-
-should_not_call_recover(_ChannelType, _DbName) ->
-    ?_test(begin
-        ok = application:stop(smoosh),
-        ok = config:set("smoosh", "persist", "false", false),
-        meck:new(smoosh_priority_queue, [passthrough]),
-        ok = application:start(smoosh),
-        timer:sleep(1000),
-        ?assertEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
-        ok
-    end).
-
-grow_db_file(DbName, SizeInKb) ->
-    {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
-    Data = b64url:encode(crypto:strong_rand_bytes(SizeInKb * 1024)),
-    Body = {[{<<"value">>, Data}]},
-    Doc = #doc{id = <<"doc1">>, body = Body},
-    {ok, _} = couch_db:update_doc(Db, Doc, []),
-    couch_db:close(Db),
-    ok.
+%% %% ==========
+%% %% Tests
+%% %% ----------
 
-is_enqueued(ChannelType, DbName) ->
-    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
-    smoosh_channel:is_key(ChannelPid, DbName).
-
-wait_enqueue(ChannelType, DbName) ->
-    test_util:wait(
-        fun() ->
-            case is_enqueued(ChannelType, DbName) of
-                false ->
-                    wait;
-                true ->
-                    ok
-            end
-        end,
-        15000
-    ).
+%% smoosh_test_() ->
+%%     {
+%%         "Testing smoosh",
+%%         {
+%%             setup,
+%%             fun() -> test_util:start_couch([smoosh]) end,
+%%             fun test_util:stop/1,
+%%             [
+%%                 channels_tests(),
+%%                 persistence_tests()
+%%             ]
+%%         }
+%%     }.
+
+%% persistence_tests() ->
+%%     Tests = [
+%%         fun should_persist_queue/2,
+%%         fun should_call_recover/2,
+%%         fun should_not_call_recover/2
+%%     ],
+%%     {
+%%         "Various persistence tests",
+%%         [
+%%             make_test_case("ratio_dbs", Tests)
+%%         ]
+%%     }.
+
+%% channels_tests() ->
+%%     Tests = [
+%%         fun should_enqueue/2
+%%     ],
+%%     {
+%%         "Various channels tests",
+%%         [
+%%             make_test_case("ratio_dbs", Tests)
+%%         ]
+%%     }.
+
+%% make_test_case(Type, Funs) ->
+%%     {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}.
+
+%% should_enqueue(ChannelType, DbName) ->
+%%     ?_test(begin
+%%         ok = grow_db_file(DbName, 300),
+%%         ok = wait_enqueue(ChannelType, DbName),
+%%         ?assert(is_enqueued(ChannelType, DbName)),
+%%         ok
+%%     end).
+
+%% should_persist_queue(ChannelType, DbName) ->
+%%     ?_test(begin
+%%         {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+%%         ok = grow_db_file(DbName, 300),
+%%         ok = wait_enqueue(ChannelType, DbName),
+%%         ok = smoosh_channel:persist(ChannelPid),
+%%         Q0 = channel_queue(ChannelType),
+%%         ok = application:stop(smoosh),
+%%         ok = application:start(smoosh),
+%%         Q1 = channel_queue(ChannelType),
+%%         % Assert that queues are not empty
+%%         ?assertNotEqual(Q0, smoosh_priority_queue:new(ChannelType)),
+%%         ?assertNotEqual(Q1, smoosh_priority_queue:new(ChannelType)),
+%%         ?assertEqual(Q0, Q1),
+%%         ok
+%%     end).
+
+%% should_call_recover(_ChannelType, _DbName) ->
+%%     ?_test(begin
+%%         ok = application:stop(smoosh),
+%%         ok = config:set("smoosh", "persist", "true", false),
+%%         meck:new(smoosh_priority_queue, [passthrough]),
+%%         ok = application:start(smoosh),
+%%         timer:sleep(1000),
+%%         ?assertNotEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
+%%         ok
+%%     end).
+
+%% should_not_call_recover(_ChannelType, _DbName) ->
+%%     ?_test(begin
+%%         ok = application:stop(smoosh),
+%%         ok = config:set("smoosh", "persist", "false", false),
+%%         meck:new(smoosh_priority_queue, [passthrough]),
+%%         ok = application:start(smoosh),
+%%         timer:sleep(1000),
+%%         ?assertEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
+%%         ok
+%%     end).
+
+%% grow_db_file(DbName, SizeInKb) ->
+%%     {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+%%     Data = b64url:encode(crypto:strong_rand_bytes(SizeInKb * 1024)),
+%%     Body = {[{<<"value">>, Data}]},
+%%     Doc = #doc{id = <<"doc1">>, body = Body},
+%%     {ok, _} = couch_db:update_doc(Db, Doc, []),
+%%     couch_db:close(Db),
+%%     ok.
+
+%% is_enqueued(ChannelType, DbName) ->
+%%     {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+%%     smoosh_channel:is_key(ChannelPid, DbName).
+
+%% wait_enqueue(ChannelType, DbName) ->
+%%     test_util:wait(
+%%         fun() ->
+%%             case is_enqueued(ChannelType, DbName) of
+%%                 false ->
+%%                     wait;
+%%                 true ->
+%%                     ok
+%%             end
+%%         end,
+%%         15000
+%%     ).
+
+%% channel_queue(ChannelType) ->
+%%     Q0 = smoosh_priority_queue:new(ChannelType),
+%%     smoosh_priority_queue:recover(Q0).
+
+create_doc(DbName, DocId, Size) ->
+    Data = b64url:encode(crypto:strong_rand_bytes(Size)),
+    Doc = #doc{id = DocId, body = {[{<<"value">>, Data}]}},
+    fabric:update_doc(DbName, Doc, [?ADMIN_CTX]).
+
+create_ddoc(DbName, DocId, ViewName) ->
+    MapFun = <<"function(doc) { emit(doc.data, null); }">>,
+    DDoc = couch_doc:from_json_obj(
+        {[
+            {<<"_id">>, DocId},
+            {<<"language">>, <<"javascript">>},
+            {<<"autoupdate">>, false},
+            {<<"views">>,
+                {[
+                    {ViewName,
+                        {[
+                            {<<"map">>, MapFun}
+                        ]}}
+                ]}}
+        ]}
+    ),
+    fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]).
+
+update_ddoc(DbName, DDName, ViewName) ->
+    {ok, DDoc0} = fabric:open_doc(DbName, DDName, [?ADMIN_CTX]),
+    MapFun = <<"function(doc) { emit(doc.value, 1); }">>,
+    DDoc = DDoc0#doc{
+        body =
+            {[
+                {<<"language">>, <<"javascript">>},
+                {<<"autoupdate">>, false},
+                {<<"views">>,
+                    {[
+                        {ViewName,
+                            {[
+                                {<<"map">>, MapFun}
+                            ]}}
+                    ]}}
+            ]}
+    },
+    fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]).
+
+delete_doc(DbName, DDName) ->
+    {ok, DDoc0} = fabric:open_doc(DbName, DDName, [?ADMIN_CTX]),
+    DDoc = DDoc0#doc{deleted = true, body = {[<<"gone">>, true]}},
+    fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]).
+
+status() ->
+    {ok, Props} = smoosh_server:status(),
+    lists:keysort(1, Props).
+
+flush() ->
+    ok = smoosh_server:flush().
+
+get_channel_pid(Chan) ->
+    [{channel, _, Pid, _}] = ets:lookup(smoosh_server, Chan),
+    Pid.
+
+wait_for_channels(N) when is_integer(N), N >= 0 ->
+    WaitFun = fun() ->
+        case length(status()) of
+            N -> ok;
+            _ -> wait
+        end
+    end,
+    test_util:wait(WaitFun).
+
+wait_to_enqueue(DbName) ->
+    [Shard] = mem3:shards(DbName),
+    ShardName = mem3:name(Shard),
+    meck:wait(smoosh_channel, handle_cast, [{enqueue, ShardName, '_'}, '_'], 4000).
+
+wait_compact_start() ->
+    meck:wait(smoosh_channel, handle_info, [{'_', {ok, '_'}}, '_'], 4000).
+
+wait_compact_done() ->
+    NormalDownPat = {'DOWN', '_', '_', '_', normal},
+    meck:wait(smoosh_channel, handle_info, [NormalDownPat, '_'], 4000).
 
-channel_queue(ChannelType) ->
-    Q0 = smoosh_priority_queue:new(ChannelType),
-    smoosh_priority_queue:recover(Q0).
+%% server_state() ->
+%%     % matches #state{} record in smoosh_server.erl
+%%     {
+%%         state,
+%%         DbChans,
+%%         ViewChans,
+%%         CleanupChans,
+%%         _EventListner,
+%%         _Waiting,
+%%         _