You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2021/12/15 23:47:27 UTC

[couchdb] 01/04: Add smoosh queue persistence

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch pr-3766
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 1a8772fc58ac23dabdfc2135f7461e7c522ae548
Author: ncshaw <nc...@ibm.com>
AuthorDate: Mon Nov 29 18:04:59 2021 -0500

    Add smoosh queue persistence
---
 rel/overlay/etc/default.ini                     |   6 +
 src/smoosh/src/smoosh_channel.erl               | 177 ++++++++++++++++++++----
 src/smoosh/src/smoosh_priority_queue.erl        | 118 +++++++++++++---
 src/smoosh/src/smoosh_server.erl                |  23 +--
 src/smoosh/test/smoosh_priority_queue_tests.erl | 146 +++++++++++++++++++
 src/smoosh/test/smoosh_tests.erl                | 127 +++++++++++++++++
 6 files changed, 546 insertions(+), 51 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 93aa1ca..0994d08 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -639,6 +639,12 @@ partitioned||* = true
 ;[smoosh.slack_views]
 ;priority = slack
 ;min_priority = 536870912
+;
+; Directory to store the state of the smoosh priority queue
+;state_dir = .
+;
+; Interval between writes of state of smoosh priority queue to save file
+;state_checkpoint_interval_in_sec = 180
 
 [ioq]
 ; The maximum number of concurrent in-flight IO requests that
diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl
index 06849ac..14a1216 100644
--- a/src/smoosh/src/smoosh_channel.erl
+++ b/src/smoosh/src/smoosh_channel.erl
@@ -17,7 +17,7 @@
 
 % public api.
 -export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
--export([enqueue/3, last_updated/2, flush/1]).
+-export([enqueue/3, last_updated/2, flush/1, is_key/2]).
 
 % gen_server api.
 -export([
@@ -25,18 +25,22 @@
     handle_call/3,
     handle_cast/2,
     handle_info/2,
-    code_change/3,
     terminate/2
 ]).
 
+-define(DEFAULT_CHECKPOINT_INTERVAL_IN_SEC, 180).
+
+-define(VSN, 1).
+
 % records.
 
 -record(state, {
     active = [],
     name,
-    waiting = smoosh_priority_queue:new(),
+    waiting,
     paused = true,
-    starting = []
+    starting = [],
+    opened = false
 }).
 
 % public functions.
@@ -65,19 +69,26 @@ close(ServerRef) ->
 flush(ServerRef) ->
     gen_server:call(ServerRef, flush).
 
+is_key(ServerRef, Key) ->
+    gen_server:call(ServerRef, {is_key, Key}).
+
 % gen_server functions.
 
 init(Name) ->
     schedule_unpause(),
     erlang:send_after(60 * 1000, self(), check_window),
-    {ok, #state{name = Name}}.
+    process_flag(trap_exit, true),
+    Queue = smoosh_priority_queue:new(Name),
+    State = #state{name = Name, waiting = Queue},
+    ok = gen_server:cast(self(), init),
+    {ok, State}.
 
 handle_call({last_updated, Object}, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting),
     {reply, LastUpdated, State};
 handle_call(suspend, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     #state{active = Active} = State,
     [
         catch erlang:suspend_process(Pid, [unless_suspending])
@@ -85,12 +96,12 @@ handle_call(suspend, _From, State0) ->
     ],
     {reply, ok, State#state{paused = true}};
 handle_call(resume, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     #state{active = Active} = State,
     [catch erlang:resume_process(Pid) || {_, Pid} <- Active],
     {reply, ok, State#state{paused = false}};
 handle_call(status, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     {reply,
         {ok, [
             {active, length(State#state.active)},
@@ -99,17 +110,28 @@ handle_call(status, _From, State0) ->
         ]},
         State};
 handle_call(close, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
-    {stop, normal, ok, State};
+    State = maybe_open_queue(State0),
+    #state{waiting = Q} = State,
+    smoosh_priority_queue:close(Q),
+    {stop, normal, ok, State#state{waiting = nil, opened = false}};
 handle_call(flush, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
-    {reply, ok, State#state{waiting = smoosh_priority_queue:new()}}.
+    #state{waiting = Q} = State = maybe_open_queue(State0),
+    {reply, ok, State#state{waiting = smoosh_priority_queue:flush(Q)}};
+handle_call({is_key, Key}, _From, State0) ->
+    State = maybe_open_queue(State0),
+    #state{waiting = Waiting} = State,
+    {reply, smoosh_priority_queue:is_key(Key, Waiting), State}.
 
+handle_cast(init, State0) ->
+    State1 = maybe_recover_state(State0),
+    State2 = maybe_open_queue(State1),
+    {noreply, State3} = handle_info(persist_queue, State2),
+    {noreply, State3};
 handle_cast({enqueue, _Object, 0}, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     {noreply, State};
 handle_cast({enqueue, Object, Priority}, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}.
 
 % We accept noproc here due to possibly having monitored a restarted compaction
@@ -118,7 +140,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) when
     Reason == normal;
     Reason == noproc
 ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     #state{active = Active, starting = Starting} = State,
     {noreply,
         maybe_start_compaction(
@@ -128,7 +150,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) when
             }
         )};
 handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     #state{active = Active0, starting = Starting0} = State,
     case lists:keytake(Job, 2, Active0) of
         {value, {Key, _Pid}, Active1} ->
@@ -151,7 +173,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
             end
     end;
 handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     case lists:keytake(Ref, 1, State#state.starting) of
         {value, {_, Key}, Starting1} ->
             couch_log:notice(
@@ -168,7 +190,7 @@ handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
             {noreply, State}
     end;
 handle_info(check_window, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     #state{paused = Paused, name = Name} = State,
     StrictWindow = smoosh_utils:get(Name, "strict_window", "false"),
     FinalState =
@@ -194,21 +216,128 @@ handle_info(check_window, State0) ->
         end,
     erlang:send_after(60 * 1000, self(), check_window),
     {noreply, FinalState};
+handle_info(persist_queue, State0) ->
+    #state{waiting = Queue} = State0,
+    write_state_to_file(State0),
+    smoosh_priority_queue:write_to_file(Queue),
+    Checkpoint =
+        config:get_integer(
+            "smoosh", "state_checkpoint_interval_in_sec", ?DEFAULT_CHECKPOINT_INTERVAL_IN_SEC
+        ) * 1000,
+    erlang:send_after(Checkpoint, self(), persist_queue),
+    {noreply, State0};
 handle_info(pause, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     {noreply, State#state{paused = true}};
 handle_info(unpause, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+    State = maybe_open_queue(State0),
     {noreply, maybe_start_compaction(State#state{paused = false})}.
 
-terminate(_Reason, _State) ->
+terminate(_Reason, #state{name = Name, waiting = Q}) ->
+    file:delete(active_file_name(Name)),
+    file:delete(starting_file_name(Name)),
+    if
+        Q =/= nil ->
+            smoosh_priority_queue:close(Q);
+        true ->
+            nil
+    end,
     ok.
 
-code_change(_OldVsn, #state{} = State, _Extra) ->
-    {ok, State}.
+maybe_recover_state(#state{name = Name} = State) ->
+    Active = recover(active_file_name(Name)),
+    Starting = recover(starting_file_name(Name)),
+    DatabaseDir = config:get("couchdb", "database_dir"),
+    ViewDir = config:get("couchdb", "view_index_dir"),
+    Active1 = get_matching_compact_files(DatabaseDir, Active),
+    Active2 = get_matching_compact_files(ViewDir, Active),
+    Active3 = Active1 ++ Active2,
+    State#state{active = Active3, starting = Starting}.
+
+get_matching_compact_files(Dir, Active) ->
+    MatchingFiles = filelib:fold_files(
+        Dir,
+        "^[a-zA-Z0-9_.-]*.compact$",
+        true,
+        (fun(FilePath, Acc) ->
+            FilePrefix = filename:rootname(FilePath, ".compact"),
+            case lists:keyfind(FilePrefix, 1, Active) of
+                false ->
+                    Acc;
+                Tuple ->
+                    [Tuple | Acc]
+            end
+        end),
+        []
+    ),
+    lists:reverse(MatchingFiles).
+
+recover(FilePath) ->
+    case do_recover(FilePath) of
+        {ok, List} ->
+            List;
+        error ->
+            []
+    end.
+
+do_recover(FilePath) ->
+    case file:read_file(FilePath) of
+        {ok, Content} ->
+            <<Vsn, Binary/binary>> = Content,
+            try parse_state(Vsn, ?VSN, Binary) of
+                Term ->
+                    {ok, Term}
+            catch
+                error:Reason ->
+                    couch_log:error(
+                        "~p Invalid state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
+                    ),
+                    file:delete(FilePath),
+                    error
+            end;
+        {error, enoent} ->
+            couch_log:notice(
+                "~p (~p) State file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]
+            ),
+            error;
+        {error, Reason} ->
+            couch_log:error(
+                "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
+            ),
+            file:delete(FilePath),
+            error
+    end.
+
+parse_state(1, ?VSN, Binary) ->
+    erlang:binary_to_term(Binary, [safe]);
+parse_state(Vsn, ?VSN, _) ->
+    error({unsupported_version, Vsn}).
+
+write_state_to_file(#state{name = Name, active = Active, starting = Starting}) ->
+    write_to_file(Active, active_file_name(Name)),
+    write_to_file(Starting, starting_file_name(Name)).
+
+write_to_file(List, FileName) ->
+    OnDisk = <<?VSN, (erlang:term_to_binary(List, [compressed, {minor_version, 1}]))/binary>>,
+    TmpFileName = FileName ++ ".tmp",
+    file:delete(TmpFileName),
+    file:write_file(TmpFileName, OnDisk, [sync]),
+    file:delete(FileName),
+    file:rename(TmpFileName, FileName).
+
+active_file_name(Name) ->
+    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active").
+
+starting_file_name(Name) ->
+    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting").
 
 % private functions.
 
+maybe_open_queue(#state{opened = true} = State) ->
+    State;
+maybe_open_queue(#state{opened = false, waiting = Queue} = State) ->
+    State#state{waiting = smoosh_priority_queue:open(Queue), opened = true}.
+
 add_to_queue(Key, Priority, State) ->
     #state{active = Active, waiting = Q} = State,
     case lists:keymember(Key, 1, Active) of
diff --git a/src/smoosh/src/smoosh_priority_queue.erl b/src/smoosh/src/smoosh_priority_queue.erl
index b6f4b6d..6492f02 100644
--- a/src/smoosh/src/smoosh_priority_queue.erl
+++ b/src/smoosh/src/smoosh_priority_queue.erl
@@ -12,32 +12,70 @@
 
 -module(smoosh_priority_queue).
 
--export([new/0, last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
+-export([new/1]).
+
+-export([open/1, last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
+
+-export([flush/1, close/1]).
+
+-export([from_list/2, to_list/1]).
+
+-export([file_name/1, write_to_file/1]).
+
+-define(VSN, 1).
 
 -record(priority_queue, {
-    dict = dict:new(),
-    tree = gb_trees:empty()
+    name,
+    map,
+    tree
 }).
 
-new() ->
-    #priority_queue{}.
+new(Name) ->
+    #priority_queue{name=Name, map=maps:new(), tree=gb_trees:empty()}.
+
+open(#priority_queue{name=Name} = Q) ->
+    case do_open(file_name(Q)) of
+        {ok, Terms} ->
+            Tree = maps:fold(fun(Key, {TreeKey, Value}, TreeAcc) ->
+                gb_trees:enter(TreeKey, {Key, Value}, TreeAcc)
+            end, gb_trees:empty(), Terms),
+            #priority_queue{name=Name, map=Terms, tree=Tree};
+        error ->
+            Q
+    end.
+
+write_to_file(#priority_queue{map=Map} = Q) ->
+    OnDisk = <<?VSN, (erlang:term_to_binary(Map, [compressed, {minor_version, 1}]))/binary>>,
+    FileName = file_name(Q),
+    TmpFileName = FileName ++ ".tmp",
+    file:delete(TmpFileName),
+    ok = file:write_file(TmpFileName, OnDisk, [sync]),
+    file:delete(FileName),
+    ok = file:rename(TmpFileName, FileName).
 
-last_updated(Key, #priority_queue{dict = Dict}) ->
-    case dict:find(Key, Dict) of
+flush(#priority_queue{name=Name} = Q) ->
+    file:delete(file_name(Q)),
+    Q#priority_queue{name=Name, map=maps:new(), tree=gb_trees:empty()}.
+
+close(#priority_queue{} = Q) ->
+    file:delete(file_name(Q)).
+
+last_updated(Key, #priority_queue{map=Map}) ->
+    case maps:find(Key, Map) of
         {ok, {_Priority, {LastUpdatedMTime, _MInt}}} ->
             LastUpdatedMTime;
         error ->
             false
     end.
 
-is_key(Key, #priority_queue{dict = Dict}) ->
-    dict:is_key(Key, Dict).
+is_key(Key, #priority_queue{map=Map}) ->
+    maps:is_key(Key, Map).
 
 in(Key, Value, Priority, Q) ->
     in(Key, Value, Priority, infinity, Q).
 
-in(Key, Value, Priority, Capacity, #priority_queue{dict = Dict, tree = Tree}) ->
-    Tree1 =
+in(Key, Value, Priority, Capacity, #priority_queue{name=Name, map=Map, tree=Tree}) ->
+    Tree1 = case maps:find(Key, Map) of
         case dict:find(Key, Dict) of
             {ok, TreeKey} ->
                 gb_trees:delete_any(TreeKey, Tree);
@@ -47,23 +85,24 @@ in(Key, Value, Priority, Capacity, #priority_queue{dict = Dict, tree = Tree}) ->
     Now = {erlang:monotonic_time(), erlang:unique_integer([monotonic])},
     TreeKey1 = {Priority, Now},
     Tree2 = gb_trees:enter(TreeKey1, {Key, Value}, Tree1),
-    Dict1 = dict:store(Key, TreeKey1, Dict),
-    truncate(Capacity, #priority_queue{dict = Dict1, tree = Tree2}).
+    Map1 = maps:put(Key, TreeKey1, Map),
+    truncate(Capacity, #priority_queue{name=Name, map=Map1, tree=Tree2}).
 
-out(#priority_queue{dict = Dict, tree = Tree}) ->
+out(#priority_queue{name=Name, map=Map, tree=Tree}) ->
     case gb_trees:is_empty(Tree) of
         true ->
             false;
         false ->
             {_, {Key, Value}, Tree1} = gb_trees:take_largest(Tree),
-            Dict1 = dict:erase(Key, Dict),
-            {Key, Value, #priority_queue{dict = Dict1, tree = Tree1}}
+            Map1 = maps:remove(Key, Map),
+            Q = #priority_queue{name=Name, map=Map1, tree=Tree1},
+            {Key, Value, Q}
     end.
 
 size(#priority_queue{tree = Tree}) ->
     gb_trees:size(Tree).
 
-info(#priority_queue{tree = Tree} = Q) ->
+info(#priority_queue{tree=Tree} = Q) ->
     [
         {size, ?MODULE:size(Q)}
         | case gb_trees:is_empty(Tree) of
@@ -76,6 +115,17 @@ info(#priority_queue{tree = Tree} = Q) ->
         end
     ].
 
+from_list(Orddict, #priority_queue{name=Name}) ->
+    Map = maps:from_list(Orddict),
+    Tree = gb_trees:from_orddict(Orddict),
+    #priority_queue{name=Name, map=Map, tree=Tree}.
+
+to_list(#priority_queue{tree=Tree}) ->
+    gb_trees:to_list(Tree).
+
+file_name(#priority_queue{name=Name}) ->
+    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".queue").
+
 truncate(infinity, Q) ->
     Q;
 truncate(Capacity, Q) when Capacity > 0 ->
@@ -83,7 +133,37 @@ truncate(Capacity, Q) when Capacity > 0 ->
 
 truncate(Capacity, Size, Q) when Size =< Capacity ->
     Q;
-truncate(Capacity, Size, #priority_queue{dict = Dict, tree = Tree}) when Size > 0 ->
+truncate(Capacity, Size, #priority_queue{name=Name, map=Map, tree=Tree}) when Size > 0 ->
     {_, {Key, _}, Tree1} = gb_trees:take_smallest(Tree),
-    Q1 = #priority_queue{dict = dict:erase(Key, Dict), tree = Tree1},
+    Q1 = #priority_queue{name=Name, map=maps:remove(Key, Map), tree=Tree1},
     truncate(Capacity, ?MODULE:size(Q1), Q1).
+
+do_open(FilePath) ->
+    case file:read_file(FilePath) of
+        {ok, Content} ->
+            <<Vsn, Binary/binary>> = Content,
+            try parse_queue(Vsn, ?VSN, Binary) of
+                Bin -> 
+                    {ok, Bin}
+            catch
+                error:Reason ->
+                    couch_log:error(
+                        "~p Invalid queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath]),
+                    file:delete(FilePath),
+                    error
+            end;
+        {error, enoent} ->
+            couch_log:notice(
+                "~p (~p) Queue file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]),
+            error;
+        {error, Reason} ->
+            couch_log:error(
+                "~p Cannot read the queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath]),
+            file:delete(FilePath),
+            error
+    end.
+
+parse_queue(1, ?VSN, Binary) -> 
+    erlang:binary_to_term(Binary, [safe]);
+parse_queue(Vsn, ?VSN, _) ->
+    error({unsupported_version, Vsn}).
diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 0526625..e89412f 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -46,6 +46,9 @@
 % exported but for internal use.
 -export([enqueue_request/2]).
 
+% exported for testing and debugging
+-export([get_channel/1]).
+
 -ifdef(TEST).
 -define(RELISTEN_DELAY, 50).
 -else.
@@ -60,7 +63,7 @@
     schema_channels = [],
     tab,
     event_listener,
-    waiting = dict:new()
+    waiting=maps:new()
 }).
 
 -record(channel, {
@@ -106,6 +109,10 @@ handle_db_event(DbName, {schema_updated, DDocId}, St) ->
 handle_db_event(_DbName, _Event, St) ->
     {ok, St}.
 
+% for testing and debugging only
+get_channel(ChannelName) ->
+    gen_server:call(?MODULE, {get_channel, ChannelName}).
+
 % gen_server functions.
 
 init([]) ->
@@ -175,10 +182,10 @@ handle_call(resume, _From, State) ->
             couch_log:notice("Resuming ~p", [Name]),
             smoosh_channel:resume(P)
         end,
-        0,
-        State#state.tab
-    ),
-    {reply, ok, State}.
+    {reply, ok, State};
+
+handle_call({get_channel, ChannelName}, _From, #state{tab = Tab} = State) ->
+    {reply, {ok, channel_pid(Tab, ChannelName)}, State}.
 
 handle_cast({new_db_channels, Channels}, State) ->
     [
@@ -200,12 +207,12 @@ handle_cast({new_schema_channels, Channels}, State) ->
     {noreply, create_missing_channels(State#state{view_channels = Channels})};
 handle_cast({enqueue, Object}, State) ->
     #state{waiting = Waiting} = State,
-    case dict:is_key(Object, Waiting) of
+    case maps:is_key(Object, Waiting) of
         true ->
             {noreply, State};
         false ->
             {_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]),
-            {noreply, State#state{waiting = dict:store(Object, Ref, Waiting)}}
+            {noreply, State#state{waiting=maps:put(Object, Ref, Waiting)}}
     end.
 
 handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
@@ -222,7 +229,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
     end,
     {noreply, create_missing_channels(State)};
 handle_info({'DOWN', Ref, _, _, _}, State) ->
-    Waiting = dict:filter(
+    Waiting = maps:filter(fun(_Key, Value) -> Value =/= Ref end,
         fun(_Key, Value) -> Value =/= Ref end,
         State#state.waiting
     ),
diff --git a/src/smoosh/test/smoosh_priority_queue_tests.erl b/src/smoosh/test/smoosh_priority_queue_tests.erl
new file mode 100644
index 0000000..668b8af
--- /dev/null
+++ b/src/smoosh/test/smoosh_priority_queue_tests.erl
@@ -0,0 +1,146 @@
+-module(smoosh_priority_queue_tests).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(PROP_PREFIX, "prop_").
+
+-define(CAPACITY, 3).
+
+-define(RANDOM_FILE, lists:flatten(io_lib:format("~p", [erlang:timestamp()]))).
+
+setup() ->
+    Ctx = test_util:start_couch(),
+    Ctx.
+
+teardown(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+smoosh_priority_queue_test_() ->
+    {
+        "smoosh priority queue test",
+        {
+            setup,
+            fun setup/0, fun teardown/1,
+            [
+                fun prop_inverse_test_/0,
+                fun no_halt_on_corrupted_file_test/0,
+                fun no_halt_on_missing_file_test/0
+            ]
+        }
+    }.
+
+%% ==========
+%% Tests
+%% ----------
+
+%% define all tests to be able to run them individually
+prop_inverse_test_() -> ?_test(begin
+        test_property(prop_inverse)
+    end).
+
+no_halt_on_corrupted_file_test() ->
+    ?_test(begin
+        Name = ?RANDOM_FILE,
+        Q0 = smoosh_priority_queue:new(Name),
+        Q = smoosh_priority_queue:open(Q0),
+        FilePath = smoosh_priority_queue:file_name(Q),
+        ok = file:write_file(FilePath, <<"garbage">>),
+        ?assertEqual(Q, smoosh_priority_queue:open(Q0)),
+        ok
+    end).
+
+no_halt_on_missing_file_test() ->
+    ?_test(begin
+        Name = ?RANDOM_FILE,
+        Q0 = smoosh_priority_queue:new(Name),
+        Q = smoosh_priority_queue:open(Q0),
+        FilePath = smoosh_priority_queue:file_name(Q),
+        ok = file:delete(FilePath),
+        ?assertEqual(Q, smoosh_priority_queue:open(Q0)),
+        ok
+    end).
+
+%% ==========
+%% Properties
+%% ----------
+
+prop_inverse() ->
+    ?FORALL(Q, queue(),
+        begin
+            List = smoosh_priority_queue:to_list(Q),
+            equal(Q, smoosh_priority_queue:from_list(List, Q))
+        end).
+
+%% ==========
+%% Generators
+%% ----------
+
+key() -> proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]).
+value() -> proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]).
+priority() -> integer().
+item() -> {key(), value(), priority()}.
+
+items_list() ->
+    ?LET(L, list(item()), L).
+
+simple_queue() ->
+    ?LET(L, items_list(),
+        from_list(L)).
+
+with_deleted() ->
+    ?LET(Q,
+        ?LET({{K0, V0, P0}, Q0}, {item(), simple_queue()},
+             smoosh_priority_queue:in(K0, V0, P0, ?CAPACITY, Q0)),
+        frequency([
+            {1, Q},
+            {2, element(3, smoosh_priority_queue:out(Q))}
+        ])).
+
+queue() ->
+    with_deleted().
+
+%% ==========================
+%% Proper related boilerplate
+%% --------------------------
+
+test_property(Property) when is_atom(Property) ->
+    test_property({atom_to_list(Property), Property});
+test_property({Id, Property}) ->
+    Name = string:sub_string(Id, length(?PROP_PREFIX) + 1),
+    Opts = [long_result, {numtests, 1000}, {to_file, user}],
+    {Name, {timeout, 60, fun() -> test_it(Property, Opts) end}}.
+
+test_it(Property, Opts) ->
+    case proper:quickcheck(?MODULE:Property(), Opts) of
+        true -> true;
+        Else -> erlang:error(
+            {propertyFailed, [
+                  {module, ?MODULE},
+                  {property, Property},
+                  {result, Else}]})
+    end.
+
+%% ================
+%% Helper functions
+%% ----------------
+
+new() ->
+    Q = smoosh_priority_queue:new("foo"),
+    smoosh_priority_queue:open(Q).
+
+from_list(List) ->
+    lists:foldl(fun({Key, Value, Priority}, Queue) ->
+       smoosh_priority_queue:in(Key, Value, Priority, ?CAPACITY, Queue)
+    end, new(), List).
+
+equal(Q1, Q2) ->
+    out_all(Q1) =:= out_all(Q2).
+
+out_all(Q) ->
+    out_all(Q, []).
+out_all(Q0, Acc) ->
+    case smoosh_priority_queue:out(Q0) of
+        {K, V, Q1} -> out_all(Q1, [{K, V} | Acc]);
+        false -> lists:reverse(Acc)
+    end.
diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl
new file mode 100644
index 0000000..e15c928
--- /dev/null
+++ b/src/smoosh/test/smoosh_tests.erl
@@ -0,0 +1,127 @@
+-module(smoosh_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-include("couch/src/couch_db_int.hrl").
+
+-define(KILOBYTE, binary:copy(<<"x">>, 1024)).
+
+%% ==========
+%% Setup
+%% ----------
+
+setup(ChannelType) ->
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    couch_db:close(Db),
+    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+    smoosh_channel:flush(ChannelPid),
+    ok = config:set(config_section(ChannelType), "min_size", "200000", false),
+    ok = config:set("smoosh", "state_checkpoint_interval_in_sec", "1", false),
+    DbName.
+
+teardown(ChannelType, DbName) ->
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+    ok = config:delete(config_section(DbName), "min_size", false),
+    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+    smoosh_channel:flush(ChannelPid),
+    ok.
+
+config_section(ChannelType) ->
+    "smoosh." ++ ChannelType.
+
+%% ==========
+%% Tests
+%% ----------
+
+smoosh_test_() ->
+    {
+        "Testing smoosh",
+        {
+            setup,
+            fun() -> test_util:start_couch([smoosh]) end,
+            fun test_util:stop/1,
+            [
+                persistence_tests(),
+                channels_tests()
+            ]
+        }
+    }.
+
+persistence_tests() ->
+    Tests = [
+        fun should_persist_queue/2
+    ],
+    {
+        "Should persist queue state",
+        [
+              make_test_case("ratio_dbs", Tests)
+        ]
+    }.
+
+
+channels_tests() ->
+    Tests = [
+        fun should_enqueue/2
+    ],
+    {
+        "Various channels tests",
+        [
+              make_test_case("ratio_dbs", Tests)
+        ]
+    }.
+
+make_test_case(Type, Funs) ->
+    {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}.
+
+should_enqueue("ratio_dbs" = ChannelType, DbName) ->
+    ?_test(begin
+        ok = grow_db_file(DbName, 300),
+        ok = wait_enqueue(ChannelType, DbName),
+        ?assert(is_enqueued(ChannelType, DbName)),
+        ok
+    end).
+
+should_persist_queue(ChannelType, DbName) ->
+    ?_test(begin
+        ok = grow_db_file(DbName, 300),
+        ok = wait_enqueue(ChannelType, DbName),
+        Q = channel_queue(ChannelType),
+        ok = application:stop(smoosh),
+        ok = application:start(smoosh),
+        ?assertEqual(Q, channel_queue(ChannelType)),
+        ok
+    end).
+
+grow_db_file(DbName, SizeInKb) ->
+    {ok, #db{filepath = FilePath} = Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+    {ok, Fd} = file:open(FilePath, [append]),
+    Bytes = binary:copy(?KILOBYTE, SizeInKb),
+    file:write(Fd, Bytes),
+    ok = file:close(Fd),
+    Doc = couch_doc:from_json_obj({[
+        {<<"_id">>, ?l2b(?docid())},
+        {<<"value">>, ?l2b(?docid())}
+    ]}),
+    {ok, _} = couch_db:update_docs(Db, [Doc], []),
+    couch_db:close(Db),
+    ok.
+
+is_enqueued(ChannelType, DbName) ->
+    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+    smoosh_channel:is_key(ChannelPid, DbName).
+
+wait_enqueue(ChannelType, DbName) ->
+    test_util:wait(fun() ->
+        case is_enqueued(ChannelType, DbName) of
+            false ->
+                wait;
+            true ->
+                ok
+        end
+    end).
+
+channel_queue(ChannelType) ->
+    Q0 = smoosh_priority_queue:new(ChannelType),
+    smoosh_priority_queue:open(Q0).