You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/11/18 22:04:45 UTC

[couchdb] 04/05: Optimize smoosh

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 111f2616e1ef48fadb43dcc358bb8dabb69ad839
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Tue Nov 15 21:28:53 2022 -0500

    Optimize smoosh
    
    Clean up, optimize and increase test coverage for smoosh.
    
      * Use the new, simpler persistence module. Remove the extra `activated vs
        non-activated` states. Use `handle_continue(...)` gen_server callback to
        avoid blocking smoosh application initialization. Then, let channel
        unpersist in their init function which happens outside general application
        initialization.
    
      * Add an index cleanup channel and enqueue index cleanup jobs by default.
    
      * Remove gen_server bottlenecks for status and last update calls. Instead
        rely on ets table lookups and gen_casts only.
    
      * Add a few more `try ... catch`'s to avoid crashing the channels.
    
      * Use maps to keep track of starting and active jobs in the channel.
    
      * Re-check priority again before starting jobs. This is needed when jobs are
        un-persisted after restart and database may have been compacted or deleted
        already.
    
      * Update periodic channel scheduled checks to have a common scheduling
        mechanism.
    
      * Quantize ratio priorities to keep about a single decimal worth of
        precision. This should help with churn in the priority queue where the same
        db is constantly added and removed for small insignificant ratio changes
        like 2.0001 -> 2.0002. This works along a recent optimization in smoosh
        priority queue module, where if priority matches that item is not removed
        and re-added, it just stays where it is, reducing CPU and heap memory
        churn.
    
      * Remove testing-only API functions to avoid cluttering the API.
    
      * Store messages off-heap for channels and smoosh_server.
    
      * Instead of a per-channel `last_updated` gen_server:call(...), use a single
        access ets table which is periodically cleaned from stale entries. As an
        optimization, before enqueueing the shard, check last access first. This
        avoids sending an extra message to smoosh_server.
    
      * As a protection mechanism against overload, cap the access table size at
        250k entries. Don't allow enqueueing more than that many entries during the
        configured `[smoosh] staleness = Minutes` period.
    
      * Increase test coverage from 60% to 90%
---
 src/smoosh/src/smoosh.erl         |  13 +-
 src/smoosh/src/smoosh_channel.erl | 815 +++++++++++++++++++-------------------
 src/smoosh/src/smoosh_server.erl  | 389 +++++++++++-------
 src/smoosh/test/smoosh_tests.erl  | 596 ++++++++++++++++++++++------
 4 files changed, 1100 insertions(+), 713 deletions(-)

diff --git a/src/smoosh/src/smoosh.erl b/src/smoosh/src/smoosh.erl
index 950500ffa..68e8d1828 100644
--- a/src/smoosh/src/smoosh.erl
+++ b/src/smoosh/src/smoosh.erl
@@ -16,7 +16,7 @@
 -include_lib("mem3/include/mem3.hrl").
 
 -export([suspend/0, resume/0, enqueue/1, status/0]).
--export([enqueue_all_dbs/0, enqueue_all_dbs/1, enqueue_all_views/0]).
+-export([enqueue_all_dbs/0, enqueue_all_views/0]).
 
 suspend() ->
     smoosh_server:suspend().
@@ -30,9 +30,6 @@ enqueue(Object) ->
 sync_enqueue(Object) ->
     smoosh_server:sync_enqueue(Object).
 
-sync_enqueue(Object, Timeout) ->
-    smoosh_server:sync_enqueue(Object, Timeout).
-
 status() ->
     smoosh_server:status().
 
@@ -44,14 +41,6 @@ enqueue_all_dbs() ->
         ok
     ).
 
-enqueue_all_dbs(Timeout) ->
-    fold_local_shards(
-        fun(#shard{name = Name}, _Acc) ->
-            sync_enqueue(Name, Timeout)
-        end,
-        ok
-    ).
-
 enqueue_all_views() ->
     fold_local_shards(
         fun(#shard{name = Name}, _Acc) ->
diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl
index 50274f704..92fd3413b 100644
--- a/src/smoosh/src/smoosh_channel.erl
+++ b/src/smoosh/src/smoosh_channel.erl
@@ -12,52 +12,52 @@
 
 -module(smoosh_channel).
 -behaviour(gen_server).
--vsn(1).
--include_lib("couch/include/couch_db.hrl").
 
 % public api.
--export([start_link/1, close/1, suspend/1, resume/1, activate/1, get_status/1]).
--export([enqueue/3, last_updated/2, flush/1, is_key/2, is_activated/1]).
+-export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
+-export([enqueue/3, flush/1]).
+-export([get_status_table/1]).
 
 % gen_server api.
 -export([
     init/1,
     handle_call/3,
     handle_cast/2,
-    handle_info/2,
-    terminate/2
+    handle_info/2
 ]).
 
--define(VSN, 1).
--define(CHECKPOINT_INTERVAL_IN_MSEC, 180000).
+-define(INDEX_CLEANUP, index_cleanup).
+-define(TIME_WINDOW_MSEC, 60 * 1000).
+-define(CHECKPOINT_INTERVAL_MSEC, 180000).
 
--ifndef(TEST).
--define(START_DELAY_IN_MSEC, 60000).
--define(ACTIVATE_DELAY_IN_MSEC, 30000).
+-ifdef(TEST).
+-define(RE_ENQUEUE_INTERVAL, 50).
+-define(STATUS_UPDATE_INTERVAL_MSEC, 490).
 -else.
--define(START_DELAY_IN_MSEC, 0).
--define(ACTIVATE_DELAY_IN_MSEC, 0).
--export([persist/1]).
+-define(RE_ENQUEUE_INTERVAL, 5000).
+-define(STATUS_UPDATE_INTERVAL_MSEC, 4900).
 -endif.
 
-% records.
-
-% When the state is set to activated = true, the channel has completed the state
-% recovery process that occurs on (re)start and is accepting new compaction jobs.
-% Note: if activated = false and a request for a new compaction job is received,
-% smoosh will enqueue this new job after the state recovery process has finished.
-% When the state is set to paused = false, the channel is actively compacting any
-% compaction jobs that are scheduled.
-% See operator_guide.md --> State diagram.
+% If persistence is configured, on startup the channel will try to load its
+% waiting queue from a persisted queue data file. Then, during every
+% CHECKPOINT_INTERVAL_MSEC, it will spawn an checkpointer process to write the
+% persisted queue state to disk.
 
 -record(state, {
-    active = [],
+    % Channel name
     name,
+    % smoosh_persisted_queue:new() object
     waiting,
+    % Paused flag. The channel starts in a the paused state.
     paused = true,
-    starting = [],
-    activated = false,
-    requests = []
+    % #{Key => Pid}
+    active = #{},
+    % #{Ref => Key}
+    starting = #{},
+    % Monitor reference of the checkpointer process
+    cref,
+    % ETS status table handle. Used to publish channel status.
+    stab
 }).
 
 % public functions.
@@ -69,19 +69,19 @@ suspend(ServerRef) ->
     gen_server:call(ServerRef, suspend).
 
 resume(ServerRef) ->
-    gen_server:call(ServerRef, resume_and_activate).
-
-activate(ServerRef) ->
-    gen_server:call(ServerRef, activate).
+    gen_server:call(ServerRef, resume).
 
 enqueue(ServerRef, Object, Priority) ->
     gen_server:cast(ServerRef, {enqueue, Object, Priority}).
 
-last_updated(ServerRef, Object) ->
-    gen_server:call(ServerRef, {last_updated, Object}).
-
-get_status(ServerRef) ->
-    gen_server:call(ServerRef, status).
+get_status(StatusTab) when is_reference(StatusTab) ->
+    try ets:lookup(StatusTab, status) of
+        [{status, Status}] -> Status;
+        [] -> []
+    catch
+        error:badarg ->
+            []
+    end.
 
 close(ServerRef) ->
     gen_server:call(ServerRef, close).
@@ -89,137 +89,92 @@ close(ServerRef) ->
 flush(ServerRef) ->
     gen_server:call(ServerRef, flush).
 
-is_key(ServerRef, Key) ->
-    gen_server:call(ServerRef, {is_key, Key}).
-
-is_activated(ServerRef) ->
-    gen_server:call(ServerRef, is_activated).
-
-% Only exported to force persistence in tests
-persist(ServerRef) ->
-    gen_server:call(ServerRef, persist).
+get_status_table(ServerRef) ->
+    gen_server:call(ServerRef, get_status_table).
 
 % gen_server functions.
 
 init(Name) ->
-    erlang:send_after(60 * 1000, self(), check_window),
     process_flag(trap_exit, true),
-    Waiting = smoosh_priority_queue:new(Name),
-    Persist = config:get_boolean("smoosh", "persist", false),
-    State =
-        case smoosh_utils:is_view_channel(Name) orelse Persist =:= false of
-            true ->
-                schedule_unpause(),
-                #state{name = Name, waiting = Waiting, paused = true, activated = true};
-            false ->
-                erlang:send_after(?START_DELAY_IN_MSEC, self(), start_recovery),
-                #state{name = Name, waiting = Waiting, paused = true, activated = false}
-        end,
-    {ok, State}.
-
-handle_call({last_updated, Object}, _From, State) ->
-    LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting),
-    {reply, LastUpdated, State};
-handle_call(suspend, _From, State) ->
-    #state{active = Active} = State,
-    [
-        catch erlang:suspend_process(Pid, [unless_suspending])
-     || {_, Pid} <- Active
-    ],
-    {reply, ok, State#state{paused = true}};
-handle_call(resume_and_activate, _From, State) ->
-    #state{active = Active} = State,
-    [catch erlang:resume_process(Pid) || {_, Pid} <- Active],
-    {reply, ok, State#state{paused = false, activated = true}};
-handle_call(activate, _From, State) ->
-    {reply, ok, State#state{activated = true}};
-handle_call(status, _From, State) ->
-    {reply,
-        {ok, [
-            {active, length(State#state.active)},
-            {starting, length(State#state.starting)},
-            {waiting, smoosh_priority_queue:info(State#state.waiting)}
-        ]},
-        State};
+    process_flag(message_queue_data, off_heap),
+    schedule_check_window(),
+    schedule_update_status(),
+    schedule_checkpoint(),
+    schedule_unpause(),
+    STab = ets:new(smoosh_stats, [{read_concurrency, true}]),
+    Waiting = unpersist(Name),
+    State = #state{name = Name, waiting = Waiting, stab = STab},
+    {ok, set_status(State)}.
+
+handle_call(get_status_table, _From, #state{} = State) ->
+    State1 = set_status(State),
+    {reply, {ok, State#state.stab}, State1};
+handle_call(suspend, _From, #state{} = State) ->
+    {reply, ok, do_suspend(State)};
+handle_call(resume, _From, #state{} = State) ->
+    {reply, ok, do_resume(State)};
 handle_call(close, _From, State) ->
     {stop, normal, ok, State};
 handle_call(flush, _From, #state{waiting = Q} = State) ->
-    {reply, ok, State#state{waiting = smoosh_priority_queue:flush(Q)}};
-handle_call({is_key, Key}, _From, #state{waiting = Waiting} = State) ->
-    {reply, smoosh_priority_queue:is_key(Key, Waiting), State};
-handle_call(is_activated, _From, #state{activated = Activated} = State0) ->
-    {reply, Activated, State0};
-handle_call(persist, _From, State) ->
-    persist_queue(State),
-    {reply, ok, State}.
-
-handle_cast({enqueue, _Object, 0}, #state{} = State) ->
-    {noreply, State};
-handle_cast({enqueue, Object, Priority}, #state{activated = true} = State) ->
-    {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))};
-handle_cast({enqueue, Object, Priority}, #state{activated = false, requests = Requests} = State0) ->
-    Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-    couch_log:Level(
-        "~p Channel is not activated yet. Adding ~p to requests with priority ~p.", [
-            ?MODULE,
-            Object,
-            Priority
-        ]
-    ),
-    {noreply, State0#state{requests = [{Object, Priority} | Requests]}}.
+    State1 = State#state{waiting = smoosh_priority_queue:flush(Q)},
+    smoosh_persist:persist(State1#state.waiting, #{}, #{}),
+    State2 = set_status(State1),
+    {reply, ok, State2}.
 
+handle_cast({enqueue, Object, Priority}, #state{} = State) ->
+    State1 = add_to_queue(Object, Priority, State),
+    {noreply, maybe_start_compaction(State1)}.
+
+handle_info({'DOWN', Ref, _, _, _}, #state{cref = Ref} = State) ->
+    {noreply, State#state{cref = undefined}};
 % We accept noproc here due to possibly having monitored a restarted compaction
 % pid after it finished.
-handle_info({'DOWN', Ref, _, Job, Reason}, State) when
+handle_info({'DOWN', Ref, _, Job, Reason}, #state{} = State) when
     Reason == normal;
     Reason == noproc
 ->
     #state{active = Active, starting = Starting} = State,
-    {noreply,
-        maybe_start_compaction(
-            State#state{
-                active = lists:keydelete(Job, 2, Active),
-                starting = lists:keydelete(Ref, 1, Starting)
-            }
-        )};
-handle_info({'DOWN', Ref, _, Job, Reason}, State) ->
-    #state{active = Active0, starting = Starting0} = State,
-    case lists:keytake(Job, 2, Active0) of
-        {value, {Key, _Pid}, Active1} ->
-            State1 = maybe_remonitor_cpid(
-                State#state{active = Active1},
-                Key,
-                Reason
-            ),
-            {noreply, maybe_start_compaction(State1)};
-        false ->
-            case lists:keytake(Ref, 1, Starting0) of
-                {value, {_, Key}, Starting1} ->
-                    couch_log:warning("failed to start compaction of ~p: ~p", [
-                        smoosh_utils:stringify(Key),
-                        Reason
-                    ]),
-                    {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
-                    {noreply, maybe_start_compaction(State#state{starting = Starting1})};
-                false ->
+    Active1 = maps:filter(fun(_, Pid) -> Pid =/= Job end, Active),
+    Starting1 = maps:remove(Ref, Starting),
+    State1 = State#state{active = Active1, starting = Starting1},
+    {noreply, maybe_start_compaction(State1)};
+handle_info({'DOWN', Ref, _, Job, Reason}, #state{} = State) ->
+    #state{name = Name, active = Active, starting = Starting} = State,
+    FoundActive = maps:filter(fun(_, Pid) -> Pid =:= Job end, Active),
+    case maps:to_list(FoundActive) of
+        [{Key, _Pid}] ->
+            Active1 = maps:without([Key], Active),
+            State1 = State#state{active = maps:without([Key], Active1)},
+            State2 = maybe_remonitor_cpid(State1, Key, Reason),
+            {noreply, maybe_start_compaction(State2)};
+        [] ->
+            case maps:take(Ref, Starting) of
+                {Key, Starting1} ->
+                    LogMsg = "~s : failed to start compaction of ~p: ~p",
+                    LogArgs = [Name, smoosh_utils:stringify(Key), Reason],
+                    couch_log:warning(LogMsg, LogArgs),
+                    re_enqueue(Key),
+                    State1 = State#state{starting = Starting1},
+                    {noreply, maybe_start_compaction(State1)};
+                error ->
                     {noreply, State}
             end
     end;
-handle_info({Ref, {ok, Pid}}, State) when is_reference(Ref) ->
-    case lists:keytake(Ref, 1, State#state.starting) of
-        {value, {_, Key}, Starting1} ->
+% This is the '$gen_call' response handling
+handle_info({Ref, {ok, Pid}}, #state{} = State) when is_reference(Ref) ->
+    #state{name = Name, active = Active, starting = Starting} = State,
+    case maps:take(Ref, Starting) of
+        {Key, Starting1} ->
             Level = smoosh_utils:log_level("compaction_log_level", "notice"),
-            couch_log:Level(
-                "~s: Started compaction for ~s",
-                [State#state.name, smoosh_utils:stringify(Key)]
-            ),
+            LogMsg = "~s: Started compaction for ~s",
+            LogArgs = [Name, smoosh_utils:stringify(Key)],
+            couch_log:Level(LogMsg, LogArgs),
             erlang:monitor(process, Pid),
             erlang:demonitor(Ref, [flush]),
-            {noreply, State#state{
-                active = [{Key, Pid} | State#state.active],
-                starting = Starting1
-            }};
-        false ->
+            Active1 = Active#{Key => Pid},
+            State1 = State#state{active = Active1, starting = Starting1},
+            {noreply, set_status(State1)};
+        error ->
             {noreply, State}
     end;
 handle_info(check_window, State) ->
@@ -235,245 +190,146 @@ handle_info(check_window, State) ->
                 State;
             {false, true} ->
                 % resume is always safe even if we did not previously suspend
-                {reply, ok, NewState} = handle_call(resume_and_activate, nil, State),
-                NewState;
+                do_resume(State);
+            {true, false} when StrictWindow =:= "true" ->
+                % suspend immediately
+                do_suspend(State);
             {true, false} ->
-                if
-                    StrictWindow =:= "true" ->
-                        {reply, ok, NewState} = handle_call(suspend, nil, State),
-                        NewState;
-                    true ->
-                        State#state{paused = true}
-                end
+                % prevent new jobs from starting, active ones keep running
+                State#state{paused = true}
         end,
-    erlang:send_after(60 * 1000, self(), check_window),
+    schedule_check_window(),
     {noreply, FinalState};
-handle_info(start_recovery, #state{name = Name, waiting = Waiting0} = State0) ->
-    RecActive = recover(active_file_name(Name)),
-    Waiting1 = lists:foldl(
-        fun(DbName, Acc) ->
-            case couch_db:exists(DbName) andalso couch_db:is_compacting(DbName) of
-                true ->
-                    Priority = smoosh_server:get_priority(Name, DbName),
-                    smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
-                false ->
-                    Acc
-            end
-        end,
-        Waiting0,
-        RecActive
-    ),
-    State1 = maybe_start_compaction(State0#state{paused = false, waiting = Waiting1}),
-    Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-    couch_log:Level(
-        "~p Previously active compaction jobs (if any) have been successfully recovered and restarted.",
-        [?MODULE]
-    ),
-    erlang:send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate),
-    {noreply, State1#state{paused = true}};
-handle_info(activate, State) ->
-    {noreply, activate_channel(State)};
-handle_info(persist, State) ->
-    ok = persist_and_reschedule(State),
+handle_info(update_status, #state{} = State) ->
+    schedule_update_status(),
+    {noreply, set_status(State)};
+handle_info(checkpoint, #state{cref = Ref} = State) when is_reference(Ref) ->
+    % If a checkpointer process is still running, don't start another one.
+    schedule_checkpoint(),
     {noreply, State};
-handle_info(pause, State) ->
-    {noreply, State#state{paused = true}};
+handle_info(checkpoint, #state{cref = undefined} = State) ->
+    % Start an asyncronous checkpoint process so we don't block the channel
+    #state{waiting = Waiting, active = Active, starting = Starting} = State,
+    Args = [Waiting, Active, Starting],
+    {_, Ref} = spawn_monitor(smoosh_persist, persist, Args),
+    schedule_checkpoint(),
+    {noreply, State#state{cref = Ref}};
 handle_info(unpause, State) ->
     {noreply, maybe_start_compaction(State#state{paused = false})}.
 
-terminate(_Reason, _State) ->
-    ok.
-
 % private functions.
 
-recover(FilePath) ->
-    case do_recover(FilePath) of
-        {ok, List} ->
-            List;
-        error ->
-            []
-    end.
-
-do_recover(FilePath) ->
-    case file:read_file(FilePath) of
-        {ok, Content} ->
-            <<Vsn, Binary/binary>> = Content,
-            try parse_state(Vsn, ?VSN, Binary) of
-                Term ->
-                    Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-                    couch_log:Level(
-                        "~p Successfully restored state file ~s", [?MODULE, FilePath]
-                    ),
-                    {ok, Term}
-            catch
-                error:Reason ->
-                    couch_log:error(
-                        "~p Invalid state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
-                    ),
-                    file:delete(FilePath),
-                    error
-            end;
-        {error, enoent} ->
-            Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-            couch_log:Level(
-                "~p (~p) State file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]
-            ),
-            error;
-        {error, Reason} ->
-            couch_log:error(
-                "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
-            ),
-            file:delete(FilePath),
-            error
-    end.
-
-parse_state(1, ?VSN, Binary) ->
-    erlang:binary_to_term(Binary, [safe]);
-parse_state(Vsn, ?VSN, _) ->
-    error({unsupported_version, Vsn}).
-
-persist_and_reschedule(State) ->
-    persist_queue(State),
-    erlang:send_after(?CHECKPOINT_INTERVAL_IN_MSEC, self(), persist),
-    ok.
-
-persist_queue(#state{name = Name, active = Active, starting = Starting, waiting = Waiting}) ->
-    Active1 = lists:foldl(
-        fun({DbName, _}, Acc) ->
-            [DbName | Acc]
-        end,
-        [],
-        Active
-    ),
-    Starting1 = lists:foldl(
-        fun({_, DbName}, Acc) ->
-            [DbName | Acc]
-        end,
-        [],
-        Starting
-    ),
-    smoosh_utils:write_to_file(Active1, active_file_name(Name), ?VSN),
-    smoosh_utils:write_to_file(Starting1, starting_file_name(Name), ?VSN),
-    smoosh_priority_queue:write_to_file(Waiting).
-
-active_file_name(Name) ->
-    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active").
-
-starting_file_name(Name) ->
-    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting").
+unpersist(Name) ->
+    % Insert into the access table with a current
+    % timestamp to prevent the same dbs from being re-enqueued
+    % again after startup
+    Waiting = smoosh_persist:unpersist(Name),
+    MapFun = fun(Object, _Priority) ->
+        smoosh_server:update_access(Object)
+    end,
+    maps:map(MapFun, smoosh_priority_queue:to_map(Waiting)),
+    Waiting.
+
+% Periodically cache in status ets table to avoid having to block on gen_server
+% get_status() calls.
+%
+set_status(#state{} = State) ->
+    #state{active = Active, starting = Starting, waiting = Waiting} = State,
+    Status = [
+        {active, map_size(Active)},
+        {starting, map_size(Starting)},
+        {waiting, smoosh_priority_queue:info(Waiting)}
+    ],
+    true = ets:insert(State#state.stab, {status, Status}),
+    State.
 
 add_to_queue(Key, Priority, State) ->
-    #state{active = Active, waiting = Q} = State,
-    case lists:keymember(Key, 1, Active) of
+    #state{name = Name, active = Active, waiting = Q} = State,
+    case is_map_key(Key, Active) of
         true ->
             State;
         false ->
-            Capacity = list_to_integer(smoosh_utils:get(State#state.name, "capacity", "9999")),
+            Capacity = smoosh_utils:capacity(State#state.name),
             Level = smoosh_utils:log_level("compaction_log_level", "notice"),
-            couch_log:Level(
-                "~s: adding ~p to internal compactor queue with priority ~p",
-                [State#state.name, Key, Priority]
-            ),
-            State#state{
-                waiting = smoosh_priority_queue:in(Key, Priority, Priority, Capacity, Q)
-            }
+            LogMsg = "~s: enqueueing ~p to compact with priority ~p",
+            LogArgs = [Name, Key, Priority],
+            couch_log:Level(LogMsg, LogArgs),
+            Q1 = smoosh_priority_queue:in(Key, Priority, Capacity, Q),
+            State#state{waiting = Q1}
     end.
 
-maybe_activate(#state{activated = true} = State) ->
+maybe_start_compaction(#state{paused = true} = State) ->
     State;
-maybe_activate(State) ->
-    activate_channel(State).
-
-activate_channel(#state{name = Name, waiting = Waiting0, requests = Requests0} = State0) ->
-    RecStarting = recover(starting_file_name(Name)),
-    Starting = lists:foldl(
-        fun(DbName, Acc) ->
-            case couch_db:exists(DbName) of
-                true ->
-                    Priority = smoosh_server:get_priority(Name, DbName),
-                    smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
-                false ->
-                    Acc
-            end
-        end,
-        Waiting0,
-        RecStarting
-    ),
-    Waiting1 = smoosh_priority_queue:recover(Starting),
-    Requests1 = lists:reverse(Requests0),
-    Waiting2 = lists:foldl(
-        fun({DbName, Priority}, Acc) ->
-            case couch_db:exists(DbName) of
-                true ->
-                    smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
-                false ->
-                    Acc
-            end
-        end,
-        Waiting1,
-        Requests1
-    ),
-    State1 = maybe_start_compaction(State0#state{
-        waiting = Waiting2, paused = false, activated = true, requests = []
-    }),
-    ok = persist_and_reschedule(State1),
-    schedule_unpause(),
-    State1#state{paused = true}.
+maybe_start_compaction(#state{paused = false, name = Name} = State) ->
+    Concurrency = smoosh_utils:concurrency(Name),
+    maybe_start_compaction(Concurrency, State).
 
-maybe_start_compaction(#state{paused = true} = State) ->
+maybe_start_compaction(Concurrency, #state{active = A, starting = S} = State) when
+    map_size(A) + map_size(S) >= Concurrency
+->
     State;
-maybe_start_compaction(State) ->
-    Concurrency = list_to_integer(
-        smoosh_utils:get(
-            State#state.name,
-            "concurrency",
-            "1"
-        )
-    ),
-    if
-        length(State#state.active) + length(State#state.starting) < Concurrency ->
-            case smoosh_priority_queue:out(State#state.waiting) of
-                false ->
-                    maybe_activate(State);
-                {Key, Priority, Q} ->
-                    try
-                        State2 =
-                            case start_compact(State, Key) of
-                                false ->
-                                    State;
-                                State1 ->
-                                    Level = smoosh_utils:log_level(
-                                        "compaction_log_level",
-                                        "notice"
-                                    ),
-                                    couch_log:Level(
-                                        "~s: Starting compaction for ~s (priority ~p)",
-                                        [State#state.name, smoosh_utils:stringify(Key), Priority]
-                                    ),
-                                    State1
-                            end,
-                        maybe_start_compaction(State2#state{waiting = Q})
-                    catch
-                        Class:Exception ->
-                            couch_log:warning(
-                                "~s: ~p ~p for ~s",
-                                [
-                                    State#state.name,
-                                    Class,
-                                    Exception,
-                                    smoosh_utils:stringify(Key)
-                                ]
-                            ),
-                            maybe_start_compaction(State#state{waiting = Q})
-                    end
-            end;
-        true ->
+maybe_start_compaction(Concurrency, #state{} = State) ->
+    case smoosh_priority_queue:out(State#state.waiting) of
+        false ->
+            State;
+        {Key, Q} ->
+            State1 = State#state{waiting = Q},
+            % Re-check priority since by the time the object was in queue, or
+            % was un-persisted after a node was down, the db or ddoc might be
+            % long gone and we don't want to crash the channel attemping to
+            % compact it
+            State2 =
+                case priority(State1, Key) of
+                    0 -> State1;
+                    _ -> try_compact(State1, Key)
+                end,
+            maybe_start_compaction(Concurrency, State2)
+    end.
+
+priority(#state{name = Name}, Key) ->
+    try
+        smoosh_server:get_priority(Name, Key)
+    catch
+        Tag:Error ->
+            % We are being defensive as we don't want to crash the channel
+            Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+            LogMsg = "~s: Failed to get priority for ~s in queue ~p:~p",
+            LogArgs = [Name, smoosh_utils:stringify(Key), Tag, Error],
+            couch_log:Level(LogMsg, LogArgs),
+            0
+    end.
+
+try_compact(#state{name = Name} = State, Key) ->
+    try start_compact(State, Key) of
+        false ->
+            State;
+        #state{} = State1 ->
+            Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+            LogMsg = "~s: Starting compaction for ~s",
+            LogArgs = [Name, smoosh_utils:stringify(Key)],
+            couch_log:Level(LogMsg, LogArgs),
+            State1
+    catch
+        Class:Exception ->
+            LogArgs = [Name, Class, Exception, smoosh_utils:stringify(Key)],
+            couch_log:warning("~s: compaction error ~p:~p for ~s", LogArgs),
             State
     end.
 
-start_compact(State, DbName) when is_list(DbName) ->
-    start_compact(State, ?l2b(DbName));
-start_compact(State, DbName) when is_binary(DbName) ->
+start_compact(#state{} = State, {?INDEX_CLEANUP, DbName} = Key) ->
+    #state{name = Name, active = Active} = State,
+    case smoosh_utils:ignore_db(DbName) of
+        false ->
+            {Pid, _Ref} = spawn_monitor(fun() -> cleanup_index_files(DbName) end),
+            Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+            LogMsg = "~s: Starting index cleanup for ~s",
+            LogArgs = [Name, smoosh_utils:stringify(Key)],
+            couch_log:Level(LogMsg, LogArgs),
+            State#state{active = Active#{Key => Pid}};
+        _ ->
+            false
+    end;
+start_compact(#state{name = Name} = State, DbName) when is_binary(DbName) ->
     case couch_db:open_int(DbName, []) of
         {ok, Db} ->
             try
@@ -482,94 +338,221 @@ start_compact(State, DbName) when is_binary(DbName) ->
                 couch_db:close(Db)
             end;
         Error = {not_found, no_db_file} ->
-            couch_log:warning(
-                "Error starting compaction for ~p: ~p",
-                [smoosh_utils:stringify(DbName), Error]
-            ),
+            LogMsg = "~s : Error starting compaction for ~p: ~p",
+            LogArgs = [Name, smoosh_utils:stringify(DbName), Error],
+            couch_log:warning(LogMsg, LogArgs),
             false
     end;
-start_compact(State, {Shard, GroupId}) ->
+start_compact(#state{} = State, {Shard, GroupId} = Key) ->
+    #state{name = Name, starting = Starting} = State,
     case smoosh_utils:ignore_db({Shard, GroupId}) of
         false ->
-            DbName = mem3:dbname(Shard),
-            {ok, Pid} = couch_index_server:get_index(
-                couch_mrview_index, Shard, GroupId
-            ),
-            spawn(fun() -> cleanup_index_files(DbName, Shard) end),
-            Ref = erlang:monitor(process, Pid),
-            Pid ! {'$gen_call', {self(), Ref}, compact},
-            State#state{starting = [{Ref, {Shard, GroupId}} | State#state.starting]};
+            case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
+                {ok, Pid} ->
+                    schedule_cleanup_index_files(Shard),
+                    Ref = erlang:monitor(process, Pid),
+                    Pid ! {'$gen_call', {self(), Ref}, compact},
+                    State#state{starting = Starting#{Ref => Key}};
+                Error ->
+                    LogMsg = "~s : Error starting view compaction for ~p: ~p",
+                    LogArgs = [Name, smoosh_utils:stringify(Key), Error],
+                    couch_log:warning(LogMsg, LogArgs),
+                    false
+            end;
         _ ->
             false
     end;
-start_compact(State, Db) ->
-    case smoosh_utils:ignore_db(Db) of
+start_compact(#state{} = State, Db) ->
+    #state{name = Name, starting = Starting, active = Active} = State,
+    Key = couch_db:name(Db),
+    case smoosh_utils:ignore_db(Key) of
         false ->
-            DbPid = couch_db:get_pid(Db),
-            Key = couch_db:name(Db),
             case couch_db:get_compactor_pid(Db) of
                 nil ->
+                    DbPid = couch_db:get_pid(Db),
                     Ref = erlang:monitor(process, DbPid),
                     DbPid ! {'$gen_call', {self(), Ref}, start_compact},
-                    State#state{starting = [{Ref, Key} | State#state.starting]};
+                    State#state{starting = Starting#{Ref => Key}};
                 % Compaction is already running, so monitor existing compaction pid.
-                CPid ->
-                    Level = smoosh_utils:log_level("compaction_log_level", "notice"),
-                    couch_log:Level(
-                        "Db ~s continuing compaction",
-                        [smoosh_utils:stringify(Key)]
-                    ),
+                CPid when is_pid(CPid) ->
                     erlang:monitor(process, CPid),
-                    State#state{active = [{Key, CPid} | State#state.active]}
+                    Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+                    LogMsg = "~s : db ~s continuing compaction",
+                    LogArgs = [Name, smoosh_utils:stringify(Key)],
+                    couch_log:Level(LogMsg, LogArgs),
+                    State#state{active = Active#{Key => CPid}}
             end;
         _ ->
             false
     end.
 
-maybe_remonitor_cpid(State, DbName, Reason) when is_binary(DbName) ->
+maybe_remonitor_cpid(#state{} = State, DbName, Reason) when is_binary(DbName) ->
+    #state{name = Name, active = Active} = State,
     case couch_db:open_int(DbName, []) of
         {ok, Db} ->
-            case couch_db:get_compactor_pid_sync(Db) of
+            try couch_db:get_compactor_pid_sync(Db) of
                 nil ->
-                    couch_log:warning(
-                        "exit for compaction of ~p: ~p",
-                        [smoosh_utils:stringify(DbName), Reason]
-                    ),
-                    {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [DbName]),
+                    LogMsg = "~s : exit for compaction of ~p: ~p",
+                    LogArgs = [Name, smoosh_utils:stringify(DbName), Reason],
+                    couch_log:warning(LogMsg, LogArgs),
+                    re_enqueue(DbName),
                     State;
-                CPid ->
-                    Level = smoosh_utils:log_level("compaction_log_level", "notice"),
-                    couch_log:Level(
-                        "~s compaction already running. Re-monitor Pid ~p",
-                        [smoosh_utils:stringify(DbName), CPid]
-                    ),
+                CPid when is_pid(CPid) ->
                     erlang:monitor(process, CPid),
-                    State#state{active = [{DbName, CPid} | State#state.active]}
+                    Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+                    LogMsg = "~s: ~s compaction already running. Re-monitor Pid ~p",
+                    LogArgs = [Name, smoosh_utils:stringify(DbName), CPid],
+                    couch_log:Level(LogMsg, LogArgs),
+                    State#state{active = Active#{DbName => CPid}}
+            catch
+                _:Error ->
+                    LogMsg = "~s: error remonitoring db compaction ~p error:~p",
+                    LogArgs = [Name, smoosh_utils:stringify(DbName), Error],
+                    couch_log:warning(LogMsg, LogArgs),
+                    re_enqueue(DbName),
+                    State
             end;
         Error = {not_found, no_db_file} ->
-            couch_log:warning(
-                "exit for compaction of ~p: ~p",
-                [smoosh_utils:stringify(DbName), Error]
-            ),
+            LogMsg = "~s : exit for compaction of ~p: ~p",
+            LogArgs = [Name, smoosh_utils:stringify(DbName), Error],
+            couch_log:warning(LogMsg, LogArgs),
             State
     end;
 % not a database compaction, so ignore the pid check
-maybe_remonitor_cpid(State, Key, Reason) ->
-    couch_log:warning(
-        "exit for compaction of ~p: ~p",
-        [smoosh_utils:stringify(Key), Reason]
-    ),
-    {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
+maybe_remonitor_cpid(#state{name = Name} = State, Key, Reason) ->
+    LogMsg = "~s: exit for compaction of ~p: ~p",
+    LogArgs = [Name, smoosh_utils:stringify(Key), Reason],
+    couch_log:warning(LogMsg, LogArgs),
+    re_enqueue(Key),
     State.
 
+schedule_check_window() ->
+    erlang:send_after(?TIME_WINDOW_MSEC, self(), check_window).
+
+schedule_update_status() ->
+    erlang:send_after(?STATUS_UPDATE_INTERVAL_MSEC, self(), update_status).
+
 schedule_unpause() ->
-    WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")),
+    WaitSecs = config:get_integer("smoosh", "wait_secs", 30),
     erlang:send_after(WaitSecs * 1000, self(), unpause).
 
-cleanup_index_files(DbName, _Shard) ->
-    case config:get("smoosh", "cleanup_index_files", "false") of
-        "true" ->
-            fabric:cleanup_index_files(DbName);
+schedule_checkpoint() ->
+    erlang:send_after(?CHECKPOINT_INTERVAL_MSEC, self(), checkpoint).
+
+re_enqueue(Obj) ->
+    case whereis(smoosh_server) of
+        Pid when is_pid(Pid) ->
+            Cast = {'$gen_cast', {enqueue, Obj}},
+            erlang:send_after(?RE_ENQUEUE_INTERVAL, Pid, Cast),
+            ok;
         _ ->
             ok
     end.
+
+cleanup_index_files(DbName) ->
+    case should_clean_up_indices() of
+        true -> fabric:cleanup_index_files(DbName);
+        false -> ok
+    end.
+
+schedule_cleanup_index_files(Shard) ->
+    case should_clean_up_indices() of
+        true ->
+            % Since cleanup is at the cluster level, schedule it with a chance
+            % inversely proportional to the number of local shards
+            DbName = mem3:dbname(Shard),
+            try length(mem3:local_shards(DbName)) of
+                ShardCount when ShardCount >= 1 ->
+                    case rand:uniform() < (1 / ShardCount) of
+                        true ->
+                            Arg = {?INDEX_CLEANUP, DbName},
+                            smoosh_server:enqueue(Arg);
+                        false ->
+                            ok
+                    end;
+                _ ->
+                    ok
+            catch
+                error:database_does_not_exist ->
+                    ok
+            end;
+        false ->
+            ok
+    end.
+
+should_clean_up_indices() ->
+    config:get_boolean("smoosh", "cleanup_index_files", true).
+
+do_suspend(#state{active = Active} = State) ->
+    [suspend_pid(Pid) || Pid <- maps:values(Active)],
+    State#state{paused = true}.
+
+do_resume(#state{active = Active} = State) ->
+    [resume_pid(Pid) || Pid <- maps:values(Active)],
+    State#state{paused = false}.
+
+suspend_pid(Pid) when is_pid(Pid) ->
+    catch erlang:suspend_process(Pid, [unless_suspending]).
+
+resume_pid(Pid) when is_pid(Pid) ->
+    catch erlang:resume_process(Pid).
+
+-ifdef(TEST).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+start_compact_errors_test_() ->
+    {
+        foreach,
+        fun setup_purge_seq/0,
+        fun teardown_purge_seq/1,
+        [
+            ?TDEF_FE(t_start_db_with_missing_db),
+            ?TDEF_FE(t_start_view_with_missing_db),
+            ?TDEF_FE(t_start_view_with_missing_index),
+            ?TDEF_FE(t_start_compact_throws)
+        ]
+    }.
+
+setup_purge_seq() ->
+    meck:new(couch_log, [passthrough]),
+    meck:new(couch_db, [passthrough]),
+    meck:new(smoosh_utils, [passthrough]),
+    Ctx = test_util:start_couch(),
+    DbName = ?tempdb(),
+    {ok, Db} = couch_server:create(DbName, []),
+    couch_db:close(Db),
+    {Ctx, DbName}.
+
+teardown_purge_seq({Ctx, DbName}) ->
+    couch_server:delete(DbName, []),
+    test_util:stop_couch(Ctx),
+    meck:unload().
+
+t_start_db_with_missing_db({_, _}) ->
+    State = #state{name = "ratio_dbs"},
+    meck:reset(couch_log),
+    try_compact(State, <<"missing_db">>),
+    ?assertEqual(1, meck:num_calls(couch_log, warning, 2)).
+
+t_start_view_with_missing_db({_, _}) ->
+    State = #state{name = "ratio_views"},
+    meck:reset(couch_log),
+    try_compact(State, {<<"missing_db">>, <<"_design/nope">>}),
+    ?assertEqual(1, meck:num_calls(couch_log, warning, 2)).
+
+t_start_view_with_missing_index({_, DbName}) ->
+    State = #state{name = "ratio_views"},
+    meck:reset(couch_log),
+    try_compact(State, {DbName, <<"_design/nope">>}),
+    ?assertEqual(1, meck:num_calls(couch_log, warning, 2)).
+
+t_start_compact_throws({_, _}) ->
+    State = #state{name = "ratio_dbs"},
+    % Make something explode inside start_compact, so pick smoosh_util:ignore/1
+    meck:expect(smoosh_utils, ignore_db, 1, meck:raise(error, foo)),
+    meck:reset(couch_log),
+    try_compact(State, {<<"some_db">>, <<"_design/some_view">>}),
+    ?assertEqual(1, meck:num_calls(couch_log, warning, 2)).
+
+-endif.
diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 27277be04..10368a549 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -12,27 +12,24 @@
 
 -module(smoosh_server).
 -behaviour(gen_server).
--vsn(4).
 -behaviour(config_listener).
--include_lib("couch/include/couch_db.hrl").
 
 % public api.
 -export([
     start_link/0,
     suspend/0,
     resume/0,
+    flush/0,
     enqueue/1,
     sync_enqueue/1,
-    sync_enqueue/2,
     handle_db_event/3,
     status/0
 ]).
 
--define(SECONDS_PER_MINUTE, 60).
-
 % gen_server api.
 -export([
     init/1,
+    handle_continue/2,
     handle_call/3,
     handle_cast/2,
     handle_info/2,
@@ -44,32 +41,44 @@
 -export([handle_config_change/5, handle_config_terminate/3]).
 
 % exported but for internal use.
--export([enqueue_request/2]).
--export([get_priority/2]).
-
-% exported for testing and debugging
--export([get_channel/1]).
+-export([
+    enqueue_request/2,
+    get_priority/2,
+    update_access/1,
+    access_cleaner/0
+]).
 
 -ifdef(TEST).
+-define(STALENESS_MIN, 0).
+-define(ACCEESS_CLEAN_INTERVAL_MSEC, 300).
 -define(RELISTEN_DELAY, 50).
 -else.
+-define(STALENESS_MIN, 5).
+-define(ACCEESS_CLEAN_INTERVAL_MSEC, 3000).
 -define(RELISTEN_DELAY, 5000).
 -endif.
 
+-define(INDEX_CLEANUP, index_cleanup).
+-define(ACCESS, smoosh_access).
+-define(ACCESS_MAX_SIZE, 250000).
+-define(ACCESS_NEVER, -1 bsl 58).
+
 % private records.
 
 -record(state, {
     db_channels = [],
     view_channels = [],
-    tab,
+    cleanup_channels = [],
     event_listener,
     waiting = #{},
-    waiting_by_ref = #{}
+    waiting_by_ref = #{},
+    access_cleaner
 }).
 
 -record(channel, {
     name,
-    pid
+    pid,
+    stab
 }).
 
 % public functions.
@@ -78,22 +87,35 @@ start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 suspend() ->
-    gen_server:call(?MODULE, suspend).
+    gen_server:call(?MODULE, suspend, infinity).
 
 resume() ->
-    gen_server:call(?MODULE, resume).
+    gen_server:call(?MODULE, resume, infinity).
 
-status() ->
-    gen_server:call(?MODULE, status).
+flush() ->
+    gen_server:call(?MODULE, flush, infinity).
 
-enqueue(Object) ->
-    gen_server:cast(?MODULE, {enqueue, Object}).
+status() ->
+    try ets:foldl(fun get_channel_status/2, [], ?MODULE) of
+        Res -> {ok, Res}
+    catch
+        error:badarg ->
+            {ok, []}
+    end.
 
-sync_enqueue(Object) ->
-    gen_server:call(?MODULE, {enqueue, Object}).
+enqueue(Object0) ->
+    Object = smoosh_utils:validate_arg(Object0),
+    case stale_enough(Object) of
+        true -> gen_server:cast(?MODULE, {enqueue, Object});
+        false -> ok
+    end.
 
-sync_enqueue(Object, Timeout) ->
-    gen_server:call(?MODULE, {enqueue, Object}, Timeout).
+sync_enqueue(Object0) ->
+    Object = smoosh_utils:validate_arg(Object0),
+    case stale_enough(Object) of
+        true -> gen_server:call(?MODULE, {enqueue, Object}, infinity);
+        false -> ok
+    end.
 
 handle_db_event(DbName, local_updated, St) ->
     enqueue(DbName),
@@ -110,35 +132,29 @@ handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) ->
 handle_db_event(_DbName, _Event, St) ->
     {ok, St}.
 
-% for testing and debugging only
-get_channel(ChannelName) ->
-    gen_server:call(?MODULE, {get_channel, ChannelName}).
-
 % gen_server functions.
 
 init([]) ->
     process_flag(trap_exit, true),
+    process_flag(message_queue_data, off_heap),
     ok = config:listen_for_changes(?MODULE, nil),
-    {ok, Pid} = start_event_listener(),
-    DbChannels = smoosh_utils:split(
-        config:get("smoosh", "db_channels", "upgrade_dbs,ratio_dbs,slack_dbs")
-    ),
-    ViewChannels = smoosh_utils:split(
-        config:get("smoosh", "view_channels", "upgrade_views,ratio_views,slack_views")
-    ),
-    Tab = ets:new(channels, [{keypos, #channel.name}]),
-    {ok,
-        create_missing_channels(#state{
-            db_channels = DbChannels,
-            view_channels = ViewChannels,
-            event_listener = Pid,
-            tab = Tab
-        })}.
-
-handle_config_change("smoosh", "db_channels", L, _, _) ->
-    {ok, gen_server:cast(?MODULE, {new_db_channels, smoosh_utils:split(L)})};
-handle_config_change("smoosh", "view_channels", L, _, _) ->
-    {ok, gen_server:cast(?MODULE, {new_view_channels, smoosh_utils:split(L)})};
+    Opts = [named_table, {read_concurrency, true}],
+    ets:new(?MODULE, Opts ++ [{keypos, #channel.name}]),
+    ets:new(?ACCESS, Opts ++ [{write_concurrency, true}, public]),
+    State = #state{
+        access_cleaner = spawn_link(?MODULE, access_cleaner, []),
+        db_channels = smoosh_utils:db_channels(),
+        view_channels = smoosh_utils:view_channels(),
+        cleanup_channels = smoosh_utils:cleanup_channels()
+    },
+    {ok, State, {continue, create_channels}}.
+
+handle_config_change("smoosh", "db_channels", _, _, _) ->
+    {ok, gen_server:cast(?MODULE, new_db_channels)};
+handle_config_change("smoosh", "view_channels", _, _, _) ->
+    {ok, gen_server:cast(?MODULE, new_view_channels)};
+handle_config_change("smoosh", "cleanup_channels", _, _, _) ->
+    {ok, gen_server:cast(?MODULE, new_cleanup_channels)};
 handle_config_change(_, _, _, _, _) ->
     {ok, nil}.
 
@@ -151,51 +167,58 @@ handle_config_terminate(_Server, _Reason, _State) ->
         restart_config_listener
     ).
 
-handle_call(status, _From, State) ->
-    Acc = ets:foldl(fun get_channel_status/2, [], State#state.tab),
-    {reply, {ok, Acc}, State};
+handle_continue(create_channels, #state{} = State) ->
+    % Warn users about smoosh persistence misconfiguration issues. Do it once
+    % on startup to avoid continuously spamming logs with errors.
+    smoosh_persist:check_setup(),
+    State1 = create_missing_channels(State),
+    {ok, Pid} = start_event_listener(),
+    {noreply, State1#state{event_listener = Pid}}.
+
 handle_call({enqueue, Object}, _From, State) ->
     {noreply, NewState} = handle_cast({enqueue, Object}, State),
     {reply, ok, NewState};
 handle_call(suspend, _From, State) ->
-    ets:foldl(
-        fun(#channel{name = Name, pid = P}, _) ->
-            Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-            couch_log:Level("Suspending ~p", [Name]),
-            smoosh_channel:suspend(P)
-        end,
-        0,
-        State#state.tab
-    ),
+    Fun = fun(#channel{name = Name, pid = P}, _) ->
+        Level = smoosh_utils:log_level("compaction_log_level", "debug"),
+        couch_log:Level("Suspending ~p", [Name]),
+        smoosh_channel:suspend(P)
+    end,
+    ets:foldl(Fun, ok, ?MODULE),
     {reply, ok, State};
 handle_call(resume, _From, State) ->
-    ets:foldl(
-        fun(#channel{name = Name, pid = P}, _) ->
-            Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-            couch_log:Level("Resuming ~p", [Name]),
-            smoosh_channel:resume(P)
-        end,
-        0,
-        State#state.tab
-    ),
+    Fun = fun(#channel{name = Name, pid = P}, _) ->
+        Level = smoosh_utils:log_level("compaction_log_level", "debug"),
+        couch_log:Level("Resuming ~p", [Name]),
+        smoosh_channel:resume(P)
+    end,
+    ets:foldl(Fun, ok, ?MODULE),
     {reply, ok, State};
-handle_call({get_channel, ChannelName}, _From, #state{tab = Tab} = State) ->
-    {reply, {ok, channel_pid(Tab, ChannelName)}, State}.
-
-handle_cast({new_db_channels, Channels}, State) ->
-    [
-        smoosh_channel:close(channel_pid(State#state.tab, C))
-     || C <- State#state.db_channels -- Channels
-    ],
-    {noreply, create_missing_channels(State#state{db_channels = Channels})};
-handle_cast({new_view_channels, Channels}, State) ->
-    [
-        smoosh_channel:close(channel_pid(State#state.tab, C))
-     || C <- State#state.view_channels -- Channels
-    ],
-    {noreply, create_missing_channels(State#state{view_channels = Channels})};
+handle_call(flush, _From, State) ->
+    Fun = fun(#channel{pid = P}, _) -> smoosh_channel:flush(P) end,
+    ets:foldl(Fun, ok, ?MODULE),
+    {reply, ok, State}.
+
+handle_cast(new_db_channels, #state{} = State) ->
+    Channels = smoosh_utils:db_channels(),
+    Closed = State#state.db_channels -- Channels,
+    [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+    State1 = State#state{db_channels = Channels},
+    {noreply, create_missing_channels(State1)};
+handle_cast(new_view_channels, #state{} = State) ->
+    Channels = smoosh_utils:view_channels(),
+    Closed = State#state.view_channels -- Channels,
+    [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+    State1 = State#state{view_channels = Channels},
+    {noreply, create_missing_channels(State1)};
+handle_cast(new_cleanup_channels, #state{} = State) ->
+    Channels = smoosh_utils:cleanup_channels(),
+    Closed = State#state.cleanup_channels -- Channels,
+    [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+    State1 = State#state{cleanup_channels = Channels},
+    {noreply, create_missing_channels(State1)};
 handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) ->
-    case maps:is_key(Object, Waiting) of
+    case is_map_key(Object, Waiting) of
         true ->
             {noreply, State};
         false ->
@@ -208,12 +231,17 @@ handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
     couch_log:Level("update notifier died ~p", [Reason]),
     {ok, Pid1} = start_event_listener(),
     {noreply, State#state{event_listener = Pid1}};
+handle_info({'EXIT', Pid, Reason}, #state{access_cleaner = Pid} = State) ->
+    Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+    couch_log:Level("access cleaner died ~p", [Reason]),
+    Pid1 = spawn_link(?MODULE, access_cleaner, []),
+    {noreply, State#state{access_cleaner = Pid1}};
 handle_info({'EXIT', Pid, Reason}, State) ->
     Level = smoosh_utils:log_level("compaction_log_level", "notice"),
     couch_log:Level("~p ~p died ~p", [?MODULE, Pid, Reason]),
-    case ets:match_object(State#state.tab, #channel{pid = Pid, _ = '_'}) of
+    case ets:match_object(?MODULE, #channel{pid = Pid, _ = '_'}) of
         [#channel{name = Name}] ->
-            ets:delete(State#state.tab, Name);
+            ets:delete(?MODULE, Name);
         _ ->
             ok
     end,
@@ -226,17 +254,22 @@ handle_info(restart_config_listener, State) ->
 handle_info(_Msg, State) ->
     {noreply, State}.
 
-terminate(_Reason, State) ->
-    ets:foldl(
-        fun(#channel{pid = P}, _) -> smoosh_channel:close(P) end,
-        0,
-        State#state.tab
-    ),
-    ok.
+terminate(_Reason, #state{access_cleaner = CPid}) ->
+    catch unlink(CPid),
+    exit(CPid, kill),
+    Fun = fun(#channel{pid = P}, _) ->
+        smoosh_channel:close(P)
+    end,
+    ets:foldl(Fun, ok, ?MODULE).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+update_access(Object) ->
+    Now = erlang:monotonic_time(second),
+    true = ets:insert(?ACCESS, {Object, Now}),
+    ok.
+
 % private functions.
 
 add_enqueue_ref(Object, Ref, #state{} = State) when is_reference(Ref) ->
@@ -251,18 +284,11 @@ remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) ->
     {Ref, Waiting1} = maps:take(Object, Waiting),
     State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
 
-get_channel_status(#channel{name = Name, pid = P}, Acc0) when is_pid(P) ->
-    try gen_server:call(P, status) of
-        {ok, Status} ->
-            [{Name, Status} | Acc0];
-        _ ->
-            Acc0
-    catch
-        _:_ ->
-            Acc0
-    end;
-get_channel_status(_, Acc0) ->
-    Acc0.
+get_channel_status(#channel{name = Name, stab = Tab}, Acc) ->
+    Status = smoosh_channel:get_status(Tab),
+    [{Name, Status} | Acc];
+get_channel_status(_, Acc) ->
+    Acc.
 
 start_event_listener() ->
     couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]).
@@ -273,79 +299,92 @@ enqueue_request(State, Object) ->
             false ->
                 ok;
             {ok, Pid, Priority} ->
-                smoosh_channel:enqueue(Pid, Object, Priority)
+                case ets:info(?ACCESS, size) of
+                    Size when Size =< ?ACCESS_MAX_SIZE ->
+                        ok = update_access(Object);
+                    _ ->
+                        ok
+                end,
+                QuantizedPriority = quantize(Priority),
+                smoosh_channel:enqueue(Pid, Object, QuantizedPriority)
         end
     catch
-        Class:Exception:Stack ->
-            couch_log:warning(
-                "~s: ~p ~p for ~s : ~p",
-                [
-                    ?MODULE,
-                    Class,
-                    Exception,
-                    smoosh_utils:stringify(Object),
-                    Stack
-                ]
-            )
+        Tag:Exception:Stack ->
+            Args = [?MODULE, Tag, Exception, smoosh_utils:stringify(Object), Stack],
+            couch_log:warning("~s: ~p ~p for ~s : ~p", Args),
+            ok
     end.
 
-find_channel(#state{} = State, {Shard, GroupId}) ->
-    find_channel(State#state.tab, State#state.view_channels, {Shard, GroupId});
+find_channel(#state{} = State, {?INDEX_CLEANUP, DbName}) ->
+    find_channel(State, State#state.cleanup_channels, {?INDEX_CLEANUP, DbName});
+find_channel(#state{} = State, {Shard, GroupId}) when is_binary(Shard) ->
+    find_channel(State, State#state.view_channels, {Shard, GroupId});
 find_channel(#state{} = State, DbName) ->
-    find_channel(State#state.tab, State#state.db_channels, DbName).
+    find_channel(State, State#state.db_channels, DbName).
 
-find_channel(_Tab, [], _Object) ->
+find_channel(#state{} = _State, [], _Object) ->
     false;
-find_channel(Tab, [Channel | Rest], Object) ->
-    Pid = channel_pid(Tab, Channel),
-    LastUpdated = smoosh_channel:last_updated(Pid, Object),
-    StalenessInSec =
-        config:get_integer("smoosh", "staleness", 5) *
-            ?SECONDS_PER_MINUTE,
-    Staleness = erlang:convert_time_unit(StalenessInSec, seconds, native),
-    Now = erlang:monotonic_time(),
-    Activated = smoosh_channel:is_activated(Pid),
-    StaleEnough = LastUpdated =:= false orelse Now - LastUpdated > Staleness,
-    case Activated andalso StaleEnough of
+find_channel(#state{} = State, [Channel | Rest], Object) ->
+    case stale_enough(Object) of
         true ->
             case smoosh_utils:ignore_db(Object) of
                 true ->
-                    find_channel(Tab, Rest, Object);
+                    find_channel(State, Rest, Object);
                 _ ->
                     case get_priority(Channel, Object) of
                         0 ->
-                            find_channel(Tab, Rest, Object);
+                            find_channel(State, Rest, Object);
                         Priority ->
-                            {ok, Pid, Priority}
+                            {ok, channel_pid(Channel), Priority}
                     end
             end;
         false ->
-            find_channel(Tab, Rest, Object)
+            find_channel(State, Rest, Object)
     end.
 
-channel_pid(Tab, Channel) ->
-    [#channel{pid = Pid}] = ets:lookup(Tab, Channel),
+stale_enough(Object) ->
+    LastUpdatedSec = last_updated(Object),
+    Staleness = erlang:monotonic_time(second) - LastUpdatedSec,
+    Staleness >= min_staleness_sec().
+
+channel_pid(Channel) ->
+    [#channel{pid = Pid}] = ets:lookup(?MODULE, Channel),
     Pid.
 
-create_missing_channels(State) ->
-    create_missing_channels(State#state.tab, State#state.db_channels),
-    create_missing_channels(State#state.tab, State#state.view_channels),
+create_missing_channels(#state{} = State) ->
+    create_missing_channels_type(State#state.db_channels),
+    create_missing_channels_type(State#state.view_channels),
+    create_missing_channels_type(State#state.cleanup_channels),
     State.
 
-create_missing_channels(_Tab, []) ->
+create_missing_channels_type([]) ->
     ok;
-create_missing_channels(Tab, [Channel | Rest]) ->
-    case ets:lookup(Tab, Channel) of
+create_missing_channels_type([Channel | Rest]) ->
+    case ets:lookup(?MODULE, Channel) of
         [] ->
             {ok, Pid} = smoosh_channel:start_link(Channel),
-            true = ets:insert(Tab, [#channel{name = Channel, pid = Pid}]);
+            {ok, STab} = smoosh_channel:get_status_table(Pid),
+            Chan = #channel{
+                name = Channel,
+                pid = Pid,
+                stab = STab
+            },
+            true = ets:insert(?MODULE, Chan);
         _ ->
             ok
     end,
-    create_missing_channels(Tab, Rest).
+    create_missing_channels_type(Rest).
 
+get_priority(_Channel, {?INDEX_CLEANUP, DbName}) ->
+    try mem3:local_shards(mem3:dbname(DbName)) of
+        [_ | _] -> 1;
+        [] -> 0
+    catch
+        error:database_does_not_exist ->
+            0
+    end;
 get_priority(Channel, {Shard, GroupId}) ->
-    case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
+    try couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
         {ok, Pid} ->
             try
                 {ok, ViewInfo} = couch_index:get_info(Pid),
@@ -368,9 +407,10 @@ get_priority(Channel, {Shard, GroupId}) ->
                 [Channel, Shard, GroupId, Reason]
             ),
             0
+    catch
+        throw:{not_found, _} ->
+            0
     end;
-get_priority(Channel, DbName) when is_list(DbName) ->
-    get_priority(Channel, ?l2b(DbName));
 get_priority(Channel, DbName) when is_binary(DbName) ->
     case couch_db:open_int(DbName, []) of
         {ok, Db} ->
@@ -379,11 +419,8 @@ get_priority(Channel, DbName) when is_binary(DbName) ->
             after
                 couch_db:close(Db)
             end;
-        Error = {not_found, no_db_file} ->
-            couch_log:warning(
-                "~p: Error getting priority for ~p: ~p",
-                [Channel, DbName, Error]
-            ),
+        {not_found, no_db_file} ->
+            % It's expected that a db might be deleted while waiting in queue
             0
     end;
 get_priority(Channel, Db) ->
@@ -494,6 +531,38 @@ view_needs_upgrade(Props) ->
             Enabled andalso length(Versions) >= 2
     end.
 
+access_cleaner() ->
+    JitterMSec = rand:uniform(?ACCEESS_CLEAN_INTERVAL_MSEC),
+    timer:sleep(?ACCEESS_CLEAN_INTERVAL_MSEC + JitterMSec),
+    NowSec = erlang:monotonic_time(second),
+    Limit = NowSec - min_staleness_sec(),
+    Head = {'_', '$1'},
+    Guard = {'<', '$1', Limit},
+    ets:select_delete(?ACCESS, [{Head, [Guard], [true]}]),
+    access_cleaner().
+
+min_staleness_sec() ->
+    Min = config:get_integer("smoosh", "staleness", ?STALENESS_MIN),
+    Min * 60.
+
+last_updated(Object) ->
+    try ets:lookup(?ACCESS, Object) of
+        [{_, AccessSec}] ->
+            AccessSec;
+        [] ->
+            ?ACCESS_NEVER
+    catch
+        error:badarg ->
+            0
+    end.
+
+quantize(Ratio) when is_integer(Ratio) ->
+    Ratio;
+quantize(Ratio) when is_float(Ratio), Ratio >= 16 ->
+    round(Ratio);
+quantize(Ratio) when is_float(Ratio) ->
+    round(Ratio * 16) / 16.
+
 -ifdef(TEST).
 
 -include_lib("couch/include/couch_eunit.hrl").
@@ -690,4 +759,16 @@ add_remove_enqueue_ref_test() ->
     % It's basically back to an initial (empty) state
     ?assertEqual(St1, #state{}).
 
+quantize_test() ->
+    ?assertEqual(0, quantize(0)),
+    ?assertEqual(1, quantize(1)),
+    ?assertEqual(0.0, quantize(0.0)),
+    ?assertEqual(16, quantize(16.0)),
+    ?assertEqual(15.0, quantize(15.0)),
+    ?assertEqual(0.0, quantize(0.01)),
+    ?assertEqual(0.125, quantize(0.1)),
+    ?assertEqual(0.125, quantize(0.1042)),
+    ?assertEqual(0.125, quantize(0.125111111111)),
+    ?assertEqual(10.0, quantize(10.0002)).
+
 -endif.
diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl
index 73876888b..0ec07025a 100644
--- a/src/smoosh/test/smoosh_tests.erl
+++ b/src/smoosh/test/smoosh_tests.erl
@@ -3,152 +3,486 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
-%% ==========
-%% Setup
-%% ----------
-
-setup(ChannelType) ->
-    DbName = ?tempdb(),
-    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
-    couch_db:close(Db),
-    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
-    smoosh_channel:flush(ChannelPid),
-    ok = config:set("smoosh", "persist", "true", false),
-    ok = config:set(config_section(ChannelType), "min_size", "1", false),
-    ok = config:set(config_section(ChannelType), "min_priority", "1", false),
-    DbName.
-
-teardown(ChannelType, DbName) ->
-    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
-    ok = config:delete("smoosh", "persist", false),
-    ok = config:delete(config_section(DbName), "min_size", false),
-    ok = config:delete(config_section(DbName), "min_priority", false),
-    meck:unload(),
-    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
-    smoosh_channel:flush(ChannelPid),
-    ok.
-
-config_section(ChannelType) ->
-    "smoosh." ++ ChannelType.
-
-%% ==========
-%% Tests
-%% ----------
-
 smoosh_test_() ->
     {
-        "Testing smoosh",
+        setup,
+        fun setup_all/0,
+        fun teardown_all/1,
         {
-            setup,
-            fun() -> test_util:start_couch([smoosh]) end,
-            fun test_util:stop/1,
+            foreach,
+            fun setup/0,
+            fun teardown/1,
             [
-                channels_tests(),
-                persistence_tests()
+                ?TDEF_FE(t_default_channels),
+                ?TDEF_FE(t_channels_recreated_on_crash),
+                ?TDEF_FE(t_can_create_and_delete_channels),
+                ?TDEF_FE(t_db_is_enqueued_and_compacted),
+                ?TDEF_FE(t_view_is_enqueued_and_compacted),
+                ?TDEF_FE(t_index_cleanup_happens_by_default),
+                ?TDEF_FE(t_index_cleanup_can_be_disabled),
+                ?TDEF_FE(t_suspend_resume),
+                ?TDEF_FE(t_check_window_can_resume),
+                ?TDEF_FE(t_renqueue_on_crashes),
+                ?TDEF_FE(t_update_status_works),
+                ?TDEF_FE(t_checkpointing_works, 15),
+                ?TDEF_FE(t_ignore_checkpoint_resume_if_compacted_already),
+                ?TDEF_FE(t_access_cleaner_restarts),
+                ?TDEF_FE(t_event_handler_restarts),
+                ?TDEF_FE(t_manual_enqueue_api_works),
+                ?TDEF_FE(t_access_cleaner_works)
             ]
         }
     }.
 
-persistence_tests() ->
-    Tests = [
-        fun should_persist_queue/2,
-        fun should_call_recover/2,
-        fun should_not_call_recover/2
-    ],
-    {
-        "Various persistence tests",
-        [
-            make_test_case("ratio_dbs", Tests)
-        ]
-    }.
+setup_all() ->
+    meck:new(smoosh_server, [passthrough]),
+    meck:new(smoosh_channel, [passthrough]),
+    meck:new(fabric, [passthrough]),
+    meck:new(couch_emsort, [passthrough]),
+    Ctx = test_util:start_couch([fabric]),
+    config:set("query_server_config", "commit_freq", "0", false),
+    Ctx.
 
-channels_tests() ->
-    Tests = [
-        fun should_enqueue/2
-    ],
-    {
-        "Various channels tests",
+teardown_all(Ctx) ->
+    catch application:stop(smoosh),
+    config:delete("query_server_config", "commit_freq", false),
+    test_util:stop(Ctx),
+    meck:unload().
+
+setup() ->
+    config:set("smoosh", "persist", "false", false),
+    config:set("smoosh", "wait_secs", "0", false),
+    DbName = ?tempdb(),
+    fabric:create_db(DbName, [{q, 1}]),
+    {ok, _} = create_ddoc(DbName, <<"_design/foo">>, <<"bar">>),
+    {ok, _} = create_doc(DbName, <<"doc1">>, 1500000),
+    {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+    application:start(smoosh),
+    wait_for_channels(),
+    flush(),
+    DbName.
+
+teardown(DbName) ->
+    catch flush(),
+    catch application:stop(smoosh),
+    fabric:delete_db(DbName),
+    meck:reset(smoosh_server),
+    meck:reset(smoosh_channel),
+    meck:reset(couch_emsort),
+    meck:reset(fabric),
+    config:delete("smoosh", "db_channels", false),
+    config:delete("smoosh.ratio_dbs", "min_priority", false),
+    config:delete("smoosh.ratio_views", "min_priority", false),
+    config:delete("smoosh", "view_channels", false),
+    config:delete("smoosh", "cleanup_channels", false),
+    config:delete("smoosh", "wait_secs", false),
+    config:delete("smoosh", "persist", false),
+    config:delete("smoosh", "cleanup_index_files", false).
+
+t_default_channels(_) ->
+    ?assertMatch(
         [
-            make_test_case("ratio_dbs", Tests)
-        ]
-    }.
+            {"index_cleanup", _},
+            {"ratio_dbs", _},
+            {"ratio_views", _},
+            {"slack_dbs", _},
+            {"slack_views", _},
+            {"upgrade_dbs", _},
+            {"upgrade_views", _}
+        ],
+        status()
+    ),
+    % If app hasn't started status won't crash
+    application:stop(smoosh),
+    ?assertEqual([], status()).
 
-make_test_case(Type, Funs) ->
-    {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}.
+t_channels_recreated_on_crash(_) ->
+    RatioDbsPid = get_channel_pid("ratio_dbs"),
+    meck:reset(smoosh_channel),
+    exit(RatioDbsPid, kill),
+    meck:wait(1, smoosh_channel, start_link, 1, 3000),
+    wait_for_channels(7),
+    ?assertMatch([_, {"ratio_dbs", _} | _], status()),
+    ?assertNotEqual(RatioDbsPid, get_channel_pid("ratio_dbs")).
 
-should_enqueue(ChannelType, DbName) ->
-    ?_test(begin
-        ok = grow_db_file(DbName, 300),
-        ok = wait_enqueue(ChannelType, DbName),
-        ?assert(is_enqueued(ChannelType, DbName)),
-        ok
-    end).
+t_can_create_and_delete_channels(_) ->
+    config:set("smoosh", "db_channels", "mychan1", false),
+    config:set("smoosh", "view_channels", "mychan2", false),
+    config:set("smoosh", "cleanup_channels", "mychan3", false),
+    % 7 default ones + 3 new ones
+    wait_for_channels(10),
+    meck:reset(smoosh_channel),
+    config:delete("smoosh", "db_channels", false),
+    config:delete("smoosh", "view_channels", false),
+    config:delete("smoosh", "cleanup_channels", false),
+    wait_for_channels(7).
 
-should_persist_queue(ChannelType, DbName) ->
-    ?_test(begin
-        {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
-        ok = grow_db_file(DbName, 300),
-        ok = wait_enqueue(ChannelType, DbName),
-        ok = smoosh_channel:persist(ChannelPid),
-        Q0 = channel_queue(ChannelType),
-        ok = application:stop(smoosh),
-        ok = application:start(smoosh),
-        Q1 = channel_queue(ChannelType),
-        % Assert that queues are not empty
-        ?assertNotEqual(Q0, smoosh_priority_queue:new(ChannelType)),
-        ?assertNotEqual(Q1, smoosh_priority_queue:new(ChannelType)),
-        ?assertEqual(Q0, Q1),
-        ok
-    end).
+t_db_is_enqueued_and_compacted(DbName) ->
+    ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+    meck:reset(smoosh_channel),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    ok = wait_compact_start(),
+    ok = wait_normal_down().
 
-should_call_recover(_ChannelType, _DbName) ->
-    ?_test(begin
-        ok = application:stop(smoosh),
-        ok = config:set("smoosh", "persist", "true", false),
-        meck:new(smoosh_priority_queue, [passthrough]),
-        ok = application:start(smoosh),
-        timer:sleep(1000),
-        ?assertNotEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
-        ok
-    end).
+t_view_is_enqueued_and_compacted(DbName) ->
+    % We don't want index cleanup to interfere for now
+    config:set("smoosh", "cleanup_index_files", "false", false),
+    % Ensure db is compacted
+    meck:reset(smoosh_channel),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_normal_down(),
+    % Check view
+    meck:reset(smoosh_channel),
+    {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+    ok = wait_to_enqueue({DbName, <<"_design/foo">>}),
+    ok = wait_compact_start(),
+    ok = wait_normal_down().
 
-should_not_call_recover(_ChannelType, _DbName) ->
-    ?_test(begin
-        ok = application:stop(smoosh),
-        ok = config:set("smoosh", "persist", "false", false),
-        meck:new(smoosh_priority_queue, [passthrough]),
-        ok = application:start(smoosh),
-        timer:sleep(1000),
-        ?assertEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
-        ok
-    end).
+t_index_cleanup_happens_by_default(DbName) ->
+    ?assert(config:get_boolean("smoosh", "cleanup_index_files", true)),
+    % Db compacts
+    meck:reset(smoosh_channel),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_normal_down(),
+    % View should compact as well
+    meck:reset(fabric),
+    meck:reset(smoosh_channel),
+    {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+    % View cleanup should have been invoked
+    meck:wait(fabric, cleanup_index_files, [DbName], 4000).
+
+t_index_cleanup_can_be_disabled(DbName) ->
+    config:set("smoosh", "cleanup_index_files", "false", false),
+    % Db compacts
+    meck:reset(smoosh_channel),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_normal_down(),
+    % View should compact as well
+    meck:reset(fabric),
+    meck:reset(smoosh_channel),
+    {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+    ok = wait_compact_start(),
+    ok = wait_normal_down(),
+    % View cleanup was not called
+    timer:sleep(1000),
+    ?assertEqual(0, meck:num_calls(fabric, cleanup_index_files, 1)).
+
+t_suspend_resume(DbName) ->
+    ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+    meck:reset(smoosh_channel),
+    setup_db_compactor_intercept(),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    CompPid = wait_db_compactor_pid(),
+    ok = smoosh:suspend(),
+    ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)),
+    ?assertEqual({1, 0, 0}, sync_status("ratio_dbs")),
+    % Suspending twice should work too
+    ok = smoosh:suspend(),
+    ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)),
+    ?assertEqual({1, 0, 0}, sync_status("ratio_dbs")),
+    ok = smoosh:resume(),
+    ?assertNotEqual({status, suspended}, erlang:process_info(CompPid, status)),
+    % Resuming twice should work too
+    ok = smoosh:resume(),
+    ?assertNotEqual({status, suspended}, erlang:process_info(CompPid, status)),
+    CompPid ! continue,
+    ok = wait_normal_down().
+
+t_check_window_can_resume(DbName) ->
+    ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+    meck:reset(smoosh_channel),
+    setup_db_compactor_intercept(),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    CompPid = wait_db_compactor_pid(),
+    ok = smoosh:suspend(),
+    ?assertEqual({status, suspended}, erlang:process_info(CompPid, status)),
+    get_channel_pid("ratio_dbs") ! check_window,
+    CompPid ! continue,
+    ok = wait_normal_down().
+
+t_renqueue_on_crashes(DbName) ->
+    ?assertEqual({0, 0, 0}, sync_status("ratio_dbs")),
+    meck:reset(smoosh_channel),
+    setup_db_compactor_intercept(),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    CompPid = wait_db_compactor_pid(),
+    meck:reset(smoosh_channel),
+    CompPid ! {raise, error, boom},
+    ok = wait_to_enqueue(DbName),
+    CompPid2 = wait_db_compactor_pid(),
+    CompPid2 ! continue,
+    ok = wait_normal_down().
+
+t_update_status_works(DbName) ->
+    setup_db_compactor_intercept(),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    CompPid = wait_db_compactor_pid(),
+    % status should have 1 starting job, but it may have not been updated yet so
+    % we wait until update_status is called
+    wait_update_status(),
+    WaitFun = fun() ->
+        case {1, 0, 0} =:= status("ratio_dbs") of
+            true -> ok;
+            _ -> wait
+        end
+    end,
+    test_util:wait(WaitFun),
+    CompPid ! continue,
+    ok = wait_normal_down().
+
+t_checkpointing_works(DbName) ->
+    setup_db_compactor_intercept(),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    CompPid = wait_db_compactor_pid(),
+    ChanPid = get_channel_pid("ratio_dbs"),
+    config:set("smoosh", "persist", "true", false),
+    meck:reset(smoosh_channel),
+    ChanPid ! checkpoint,
+    % Wait for checkpoint process to exit
+    ok = wait_normal_down(),
+    % Stop smoosh and then crash the compaction
+    ok = application:stop(smoosh),
+    CompPid ! {raise, error, kapow},
+    % Smoosh should resume job and continue compacting
+    setup_db_compactor_intercept(),
+    meck:reset(smoosh_channel),
+    ok = application:start(smoosh),
+    CompPid2 = wait_db_compactor_pid(),
+    ?assertEqual({1, 0, 0}, sync_status("ratio_dbs")),
+    CompPid2 ! continue,
+    ok = wait_normal_down().
+
+t_ignore_checkpoint_resume_if_compacted_already(DbName) ->
+    setup_db_compactor_intercept(),
+    {ok, _} = delete_doc(DbName, <<"doc1">>),
+    ok = wait_to_enqueue(DbName),
+    CompPid = wait_db_compactor_pid(),
+    ChanPid = get_channel_pid("ratio_dbs"),
+    config:set("smoosh", "persist", "true", false),
+    meck:reset(smoosh_channel),
+    ChanPid ! checkpoint,
+    % Wait for checkpoint process to exit
+    ok = wait_normal_down(),
+    % Stop smoosh and then let the compaction finish
+    ok = application:stop(smoosh),
+    Ref = erlang:monitor(process, CompPid),
+    CompPid ! continue,
+    receive
+        {'DOWN', Ref, _, _, normal} -> ok
+    end,
+    % Smoosh should resume job and *not* compact
+    setup_db_compactor_intercept(),
+    meck:reset(smoosh_channel),
+    ok = application:start(smoosh),
+    timer:sleep(500),
+    StartPat = {'_', {ok, '_'}},
+    ?assertEqual(0, meck:num_calls(smoosh_channel, handle_info, [StartPat, '_'])).
+
+t_access_cleaner_restarts(_) ->
+    ACPid = get_access_cleaner_pid(),
+    exit(ACPid, kill),
+    WaitFun = fun() ->
+        case get_access_cleaner_pid() of
+            Pid when Pid =/= ACPid -> ok;
+            _ -> wait
+        end
+    end,
+    test_util:wait(WaitFun),
+    ?assertNotEqual(ACPid, get_access_cleaner_pid()).
+
+t_event_handler_restarts(_) ->
+    EHPid = get_event_handler_pid(),
+    exit(EHPid, kill),
+    WaitFun = fun() ->
+        case get_event_handler_pid() of
+            Pid when Pid =/= EHPid -> ok;
+            _ -> wait
+        end
+    end,
+    test_util:wait(WaitFun),
+    ?assertNotEqual(EHPid, get_access_cleaner_pid()).
 
-grow_db_file(DbName, SizeInKb) ->
-    {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
-    Data = b64url:encode(crypto:strong_rand_bytes(SizeInKb * 1024)),
-    Body = {[{<<"value">>, Data}]},
-    Doc = #doc{id = <<"doc1">>, body = Body},
-    {ok, _} = couch_db:update_doc(Db, Doc, []),
-    couch_db:close(Db),
-    ok.
-
-is_enqueued(ChannelType, DbName) ->
-    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
-    smoosh_channel:is_key(ChannelPid, DbName).
-
-wait_enqueue(ChannelType, DbName) ->
-    test_util:wait(
-        fun() ->
-            case is_enqueued(ChannelType, DbName) of
-                false ->
-                    wait;
-                true ->
-                    ok
-            end
+t_manual_enqueue_api_works(DbName) ->
+    Shard = shard_name(DbName),
+
+    SmooshPid = whereis(smoosh_server),
+    RatioDbsPid = get_channel_pid("ratio_dbs"),
+    RatioViewsPid = get_channel_pid("ratio_views"),
+    CleanupPid = get_channel_pid("index_cleanup"),
+
+    % Lower min priority so that enqueued shards would try to compact
+    config:set("smoosh.ratio_dbs", "min_priority", "1", false),
+    config:set("smoosh.ratio_views", "min_priority", "1", false),
+
+    ?assertEqual(ok, smoosh_server:sync_enqueue(<<"invalid">>)),
+    ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, <<"invalid">>})),
+    ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/invalid">>})),
+
+    ?assertEqual(ok, smoosh_server:sync_enqueue(Shard)),
+    ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, Shard})),
+    ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/foo">>})),
+
+    ?assertEqual(ok, smoosh:enqueue(Shard)),
+    ?assertEqual(ok, smoosh:enqueue({index_cleanup, Shard})),
+    ?assertEqual(ok, smoosh:enqueue({Shard, <<"_design/foo">>})),
+
+    smoosh:enqueue_all_dbs(),
+    smoosh:enqueue_all_views(),
+
+    % Enqueuing the same items in a loop should work
+    lists:foreach(
+        fun(_) ->
+            ?assertEqual(ok, smoosh:enqueue(Shard)),
+            ?assertEqual(ok, smoosh:enqueue({index_cleanup, Shard})),
+            ?assertEqual(ok, smoosh:enqueue({Shard, <<"_design/foo">>}))
         end,
-        15000
-    ).
+        lists:seq(1, 1000)
+    ),
+
+    ?assertEqual(ok, smoosh_server:sync_enqueue(Shard)),
+    ?assertEqual(ok, smoosh_server:sync_enqueue({index_cleanup, Shard})),
+    ?assertEqual(ok, smoosh_server:sync_enqueue({Shard, <<"_design/foo">>})),
+
+    % Assert that channels and smoosh server didn't crash
+    ?assertEqual(SmooshPid, whereis(smoosh_server)),
+    ?assertEqual(RatioDbsPid, get_channel_pid("ratio_dbs")),
+    ?assertEqual(RatioViewsPid, get_channel_pid("ratio_views")),
+    ?assertEqual(CleanupPid, get_channel_pid("index_cleanup")).
+
+t_access_cleaner_works(_) ->
+    Now = erlang:monotonic_time(second),
+    ets:insert(smoosh_access, {<<"db1">>, Now - 3600}),
+    WaitFun = fun() ->
+        case ets:tab2list(smoosh_access) == [] of
+            true -> ok;
+            _ -> wait
+        end
+    end,
+    test_util:wait(WaitFun),
+    ?assertEqual([], ets:tab2list(smoosh_access)).
+
+create_doc(DbName, DocId, Size) ->
+    Data = b64url:encode(crypto:strong_rand_bytes(Size)),
+    Doc = #doc{id = DocId, body = {[{<<"value">>, Data}]}},
+    fabric:update_doc(DbName, Doc, [?ADMIN_CTX]).
+
+create_ddoc(DbName, DocId, ViewName) ->
+    MapFun = <<"function(doc) {emit(doc._id, doc.value);}">>,
+    DDoc = couch_doc:from_json_obj(
+        {[
+            {<<"_id">>, DocId},
+            {<<"language">>, <<"javascript">>},
+            {<<"autoupdate">>, false},
+            {<<"views">>,
+                {[
+                    {ViewName,
+                        {[
+                            {<<"map">>, MapFun}
+                        ]}}
+                ]}}
+        ]}
+    ),
+    fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]).
+
+delete_doc(DbName, DDocId) ->
+    {ok, DDoc0} = fabric:open_doc(DbName, DDocId, [?ADMIN_CTX]),
+    DDoc = DDoc0#doc{deleted = true, body = {[]}},
+    fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]).
+
+status() ->
+    {ok, Props} = smoosh:status(),
+    lists:keysort(1, Props).
+
+status(Channel) ->
+    case lists:keyfind(Channel, 1, status()) of
+        {_, Val} ->
+            Val,
+            Active = proplists:get_value(active, Val),
+            Starting = proplists:get_value(starting, Val),
+            WaitingInfo = proplists:get_value(waiting, Val),
+            Waiting = proplists:get_value(size, WaitingInfo),
+            {Active, Starting, Waiting};
+        false ->
+            false
+    end.
+
+sync_status(Channel) ->
+    Pid = get_channel_pid(Channel),
+    gen_server:call(Pid, get_status_table, infinity),
+    status(Channel).
+
+flush() ->
+    ok = smoosh_server:flush().
+
+get_channel_pid(Chan) ->
+    [{channel, _, Pid, _}] = ets:lookup(smoosh_server, Chan),
+    Pid.
+
+get_access_cleaner_pid() ->
+    {state, _, _, _, _, _, _, ACPid} = sys:get_state(smoosh_server),
+    ACPid.
+
+get_event_handler_pid() ->
+    {state, _, _, _, EHPid, _, _, _} = sys:get_state(smoosh_server),
+    EHPid.
+
+wait_for_channels() ->
+    % 3 default ratios + 3 default views + 1 index cleanup = 7
+    wait_for_channels(7).
+
+wait_for_channels(N) when is_integer(N), N >= 0 ->
+    WaitFun = fun() ->
+        case length(status()) of
+            N -> ok;
+            _ -> wait
+        end
+    end,
+    test_util:wait(WaitFun).
+
+wait_to_enqueue(DbName) when is_binary(DbName) ->
+    wait_enqueue(shard_name(DbName));
+wait_to_enqueue({DbName, View}) when is_binary(DbName) ->
+    wait_enqueue({shard_name(DbName), View});
+wait_to_enqueue({index_cleanup, DbName}) when is_binary(DbName) ->
+    wait_enqueue({index_cleanup, DbName}).
+
+wait_enqueue(Obj) ->
+    Enqueue = {enqueue, Obj, '_'},
+    meck:wait(smoosh_channel, handle_cast, [Enqueue, '_'], 4000).
+
+shard_name(DbName) ->
+    [Shard] = mem3:shards(DbName),
+    mem3:name(Shard).
+
+wait_compact_start() ->
+    StartOk = {'_', {ok, '_'}},
+    meck:wait(smoosh_channel, handle_info, [StartOk, '_'], 4000).
+
+wait_normal_down() ->
+    NormalDown = {'DOWN', '_', '_', '_', normal},
+    meck:wait(smoosh_channel, handle_info, [NormalDown, '_'], 4000).
+
+wait_update_status() ->
+    meck:wait(smoosh_channel, handle_info, [update_status, '_'], 4000).
+
+setup_db_compactor_intercept() ->
+    TestPid = self(),
+    meck:expect(couch_emsort, open, fun(Fd) ->
+        TestPid ! {compactor_paused, self()},
+        receive
+            continue -> meck:passthrough([Fd]);
+            {raise, Tag, Reason} -> meck:exception(Tag, Reason)
+        end
+    end).
 
-channel_queue(ChannelType) ->
-    Q0 = smoosh_priority_queue:new(ChannelType),
-    smoosh_priority_queue:recover(Q0).
+wait_db_compactor_pid() ->
+    receive
+        {compactor_paused, Pid} ->
+            Pid
+    end.