You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2021/12/15 23:47:27 UTC
[couchdb] 01/04: Add smoosh queue persistence
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch pr-3766
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 1a8772fc58ac23dabdfc2135f7461e7c522ae548
Author: ncshaw <nc...@ibm.com>
AuthorDate: Mon Nov 29 18:04:59 2021 -0500
Add smoosh queue persistence
---
rel/overlay/etc/default.ini | 6 +
src/smoosh/src/smoosh_channel.erl | 177 ++++++++++++++++++++----
src/smoosh/src/smoosh_priority_queue.erl | 118 +++++++++++++---
src/smoosh/src/smoosh_server.erl | 23 +--
src/smoosh/test/smoosh_priority_queue_tests.erl | 146 +++++++++++++++++++
src/smoosh/test/smoosh_tests.erl | 127 +++++++++++++++++
6 files changed, 546 insertions(+), 51 deletions(-)
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 93aa1ca..0994d08 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -639,6 +639,12 @@ partitioned||* = true
;[smoosh.slack_views]
;priority = slack
;min_priority = 536870912
+;
+; Directory to store the state of the smoosh priority queue
+;state_dir = .
+;
+; Interval between writes of state of smoosh priority queue to save file
+;state_checkpoint_interval_in_sec = 180
[ioq]
; The maximum number of concurrent in-flight IO requests that
diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl
index 06849ac..14a1216 100644
--- a/src/smoosh/src/smoosh_channel.erl
+++ b/src/smoosh/src/smoosh_channel.erl
@@ -17,7 +17,7 @@
% public api.
-export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
--export([enqueue/3, last_updated/2, flush/1]).
+-export([enqueue/3, last_updated/2, flush/1, is_key/2]).
% gen_server api.
-export([
@@ -25,18 +25,22 @@
handle_call/3,
handle_cast/2,
handle_info/2,
- code_change/3,
terminate/2
]).
+-define(DEFAULT_CHECKPOINT_INTERVAL_IN_SEC, 180).
+
+-define(VSN, 1).
+
% records.
-record(state, {
active = [],
name,
- waiting = smoosh_priority_queue:new(),
+ waiting,
paused = true,
- starting = []
+ starting = [],
+ opened = false
}).
% public functions.
@@ -65,19 +69,26 @@ close(ServerRef) ->
flush(ServerRef) ->
gen_server:call(ServerRef, flush).
+is_key(ServerRef, Key) ->
+ gen_server:call(ServerRef, {is_key, Key}).
+
% gen_server functions.
init(Name) ->
schedule_unpause(),
erlang:send_after(60 * 1000, self(), check_window),
- {ok, #state{name = Name}}.
+ process_flag(trap_exit, true),
+ Queue = smoosh_priority_queue:new(Name),
+ State = #state{name = Name, waiting = Queue},
+ ok = gen_server:cast(self(), init),
+ {ok, State}.
handle_call({last_updated, Object}, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting),
{reply, LastUpdated, State};
handle_call(suspend, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
#state{active = Active} = State,
[
catch erlang:suspend_process(Pid, [unless_suspending])
@@ -85,12 +96,12 @@ handle_call(suspend, _From, State0) ->
],
{reply, ok, State#state{paused = true}};
handle_call(resume, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
#state{active = Active} = State,
[catch erlang:resume_process(Pid) || {_, Pid} <- Active],
{reply, ok, State#state{paused = false}};
handle_call(status, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
{reply,
{ok, [
{active, length(State#state.active)},
@@ -99,17 +110,28 @@ handle_call(status, _From, State0) ->
]},
State};
handle_call(close, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- {stop, normal, ok, State};
+ State = maybe_open_queue(State0),
+ #state{waiting = Q} = State,
+ smoosh_priority_queue:close(Q),
+ {stop, normal, ok, State#state{waiting = nil, opened = false}};
handle_call(flush, _From, State0) ->
- {ok, State} = code_change(nil, State0, nil),
- {reply, ok, State#state{waiting = smoosh_priority_queue:new()}}.
+ #state{waiting = Q} = State = maybe_open_queue(State0),
+ {reply, ok, State#state{waiting = smoosh_priority_queue:flush(Q)}};
+handle_call({is_key, Key}, _From, State0) ->
+ State = maybe_open_queue(State0),
+ #state{waiting = Waiting} = State,
+ {reply, smoosh_priority_queue:is_key(Key, Waiting), State}.
+handle_cast(init, State0) ->
+ State1 = maybe_recover_state(State0),
+ State2 = maybe_open_queue(State1),
+ {noreply, State3} = handle_info(persist_queue, State2),
+ {noreply, State3};
handle_cast({enqueue, _Object, 0}, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
{noreply, State};
handle_cast({enqueue, Object, Priority}, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
{noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}.
% We accept noproc here due to possibly having monitored a restarted compaction
@@ -118,7 +140,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) when
Reason == normal;
Reason == noproc
->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
#state{active = Active, starting = Starting} = State,
{noreply,
maybe_start_compaction(
@@ -128,7 +150,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) when
}
)};
handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
#state{active = Active0, starting = Starting0} = State,
case lists:keytake(Job, 2, Active0) of
{value, {Key, _Pid}, Active1} ->
@@ -151,7 +173,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
end
end;
handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
case lists:keytake(Ref, 1, State#state.starting) of
{value, {_, Key}, Starting1} ->
couch_log:notice(
@@ -168,7 +190,7 @@ handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
{noreply, State}
end;
handle_info(check_window, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
#state{paused = Paused, name = Name} = State,
StrictWindow = smoosh_utils:get(Name, "strict_window", "false"),
FinalState =
@@ -194,21 +216,128 @@ handle_info(check_window, State0) ->
end,
erlang:send_after(60 * 1000, self(), check_window),
{noreply, FinalState};
+handle_info(persist_queue, State0) ->
+ #state{waiting = Queue} = State0,
+ write_state_to_file(State0),
+ smoosh_priority_queue:write_to_file(Queue),
+ Checkpoint =
+ config:get_integer(
+ "smoosh", "state_checkpoint_interval_in_sec", ?DEFAULT_CHECKPOINT_INTERVAL_IN_SEC
+ ) * 1000,
+ erlang:send_after(Checkpoint, self(), persist_queue),
+ {noreply, State0};
handle_info(pause, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
{noreply, State#state{paused = true}};
handle_info(unpause, State0) ->
- {ok, State} = code_change(nil, State0, nil),
+ State = maybe_open_queue(State0),
{noreply, maybe_start_compaction(State#state{paused = false})}.
-terminate(_Reason, _State) ->
+terminate(_Reason, #state{name = Name, waiting = Q}) ->
+ file:delete(active_file_name(Name)),
+ file:delete(starting_file_name(Name)),
+ if
+ Q =/= nil ->
+ smoosh_priority_queue:close(Q);
+ true ->
+ nil
+ end,
ok.
-code_change(_OldVsn, #state{} = State, _Extra) ->
- {ok, State}.
+maybe_recover_state(#state{name = Name} = State) ->
+ Active = recover(active_file_name(Name)),
+ Starting = recover(starting_file_name(Name)),
+ DatabaseDir = config:get("couchdb", "database_dir"),
+ ViewDir = config:get("couchdb", "view_index_dir"),
+ Active1 = get_matching_compact_files(DatabaseDir, Active),
+ Active2 = get_matching_compact_files(ViewDir, Active),
+ Active3 = Active1 ++ Active2,
+ State#state{active = Active3, starting = Starting}.
+
+get_matching_compact_files(Dir, Active) ->
+ MatchingFiles = filelib:fold_files(
+ Dir,
+ "^[a-zA-Z0-9_.-]*.compact$",
+ true,
+ (fun(FilePath, Acc) ->
+ FilePrefix = filename:rootname(FilePath, ".compact"),
+ case lists:keyfind(FilePrefix, 1, Active) of
+ false ->
+ Acc;
+ Tuple ->
+ [Tuple | Acc]
+ end
+ end),
+ []
+ ),
+ lists:reverse(MatchingFiles).
+
+recover(FilePath) ->
+ case do_recover(FilePath) of
+ {ok, List} ->
+ List;
+ error ->
+ []
+ end.
+
+do_recover(FilePath) ->
+ case file:read_file(FilePath) of
+ {ok, Content} ->
+ <<Vsn, Binary/binary>> = Content,
+ try parse_state(Vsn, ?VSN, Binary) of
+ Term ->
+ {ok, Term}
+ catch
+ error:Reason ->
+ couch_log:error(
+ "~p Invalid state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
+ ),
+ file:delete(FilePath),
+ error
+ end;
+ {error, enoent} ->
+ couch_log:notice(
+ "~p (~p) State file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]
+ ),
+ error;
+ {error, Reason} ->
+ couch_log:error(
+ "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
+ ),
+ file:delete(FilePath),
+ error
+ end.
+
+parse_state(1, ?VSN, Binary) ->
+ erlang:binary_to_term(Binary, [safe]);
+parse_state(Vsn, ?VSN, _) ->
+ error({unsupported_version, Vsn}).
+
+write_state_to_file(#state{name = Name, active = Active, starting = Starting}) ->
+ write_to_file(Active, active_file_name(Name)),
+ write_to_file(Starting, starting_file_name(Name)).
+
+write_to_file(List, FileName) ->
+ OnDisk = <<?VSN, (erlang:term_to_binary(List, [compressed, {minor_version, 1}]))/binary>>,
+ TmpFileName = FileName ++ ".tmp",
+ file:delete(TmpFileName),
+ file:write_file(TmpFileName, OnDisk, [sync]),
+ file:delete(FileName),
+ file:rename(TmpFileName, FileName).
+
+active_file_name(Name) ->
+ filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active").
+
+starting_file_name(Name) ->
+ filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting").
% private functions.
+maybe_open_queue(#state{opened = true} = State) ->
+ State;
+maybe_open_queue(#state{opened = false, waiting = Queue} = State) ->
+ State#state{waiting = smoosh_priority_queue:open(Queue), opened = true}.
+
add_to_queue(Key, Priority, State) ->
#state{active = Active, waiting = Q} = State,
case lists:keymember(Key, 1, Active) of
diff --git a/src/smoosh/src/smoosh_priority_queue.erl b/src/smoosh/src/smoosh_priority_queue.erl
index b6f4b6d..6492f02 100644
--- a/src/smoosh/src/smoosh_priority_queue.erl
+++ b/src/smoosh/src/smoosh_priority_queue.erl
@@ -12,32 +12,70 @@
-module(smoosh_priority_queue).
--export([new/0, last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
+-export([new/1]).
+
+-export([open/1, last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
+
+-export([flush/1, close/1]).
+
+-export([from_list/2, to_list/1]).
+
+-export([file_name/1, write_to_file/1]).
+
+-define(VSN, 1).
-record(priority_queue, {
- dict = dict:new(),
- tree = gb_trees:empty()
+ name,
+ map,
+ tree
}).
-new() ->
- #priority_queue{}.
+new(Name) ->
+ #priority_queue{name=Name, map=maps:new(), tree=gb_trees:empty()}.
+
+open(#priority_queue{name=Name} = Q) ->
+ case do_open(file_name(Q)) of
+ {ok, Terms} ->
+ Tree = maps:fold(fun(Key, {TreeKey, Value}, TreeAcc) ->
+ gb_trees:enter(TreeKey, {Key, Value}, TreeAcc)
+ end, gb_trees:empty(), Terms),
+ #priority_queue{name=Name, map=Terms, tree=Tree};
+ error ->
+ Q
+ end.
+
+write_to_file(#priority_queue{map=Map} = Q) ->
+ OnDisk = <<?VSN, (erlang:term_to_binary(Map, [compressed, {minor_version, 1}]))/binary>>,
+ FileName = file_name(Q),
+ TmpFileName = FileName ++ ".tmp",
+ file:delete(TmpFileName),
+ ok = file:write_file(TmpFileName, OnDisk, [sync]),
+ file:delete(FileName),
+ ok = file:rename(TmpFileName, FileName).
-last_updated(Key, #priority_queue{dict = Dict}) ->
- case dict:find(Key, Dict) of
+flush(#priority_queue{name=Name} = Q) ->
+ file:delete(file_name(Q)),
+ Q#priority_queue{name=Name, map=maps:new(), tree=gb_trees:empty()}.
+
+close(#priority_queue{} = Q) ->
+ file:delete(file_name(Q)).
+
+last_updated(Key, #priority_queue{map=Map}) ->
+ case maps:find(Key, Map) of
{ok, {_Priority, {LastUpdatedMTime, _MInt}}} ->
LastUpdatedMTime;
error ->
false
end.
-is_key(Key, #priority_queue{dict = Dict}) ->
- dict:is_key(Key, Dict).
+is_key(Key, #priority_queue{map=Map}) ->
+ maps:is_key(Key, Map).
in(Key, Value, Priority, Q) ->
in(Key, Value, Priority, infinity, Q).
-in(Key, Value, Priority, Capacity, #priority_queue{dict = Dict, tree = Tree}) ->
- Tree1 =
+in(Key, Value, Priority, Capacity, #priority_queue{name=Name, map=Map, tree=Tree}) ->
+ Tree1 = case maps:find(Key, Map) of
case dict:find(Key, Dict) of
{ok, TreeKey} ->
gb_trees:delete_any(TreeKey, Tree);
@@ -47,23 +85,24 @@ in(Key, Value, Priority, Capacity, #priority_queue{dict = Dict, tree = Tree}) ->
Now = {erlang:monotonic_time(), erlang:unique_integer([monotonic])},
TreeKey1 = {Priority, Now},
Tree2 = gb_trees:enter(TreeKey1, {Key, Value}, Tree1),
- Dict1 = dict:store(Key, TreeKey1, Dict),
- truncate(Capacity, #priority_queue{dict = Dict1, tree = Tree2}).
+ Map1 = maps:put(Key, TreeKey1, Map),
+ truncate(Capacity, #priority_queue{name=Name, map=Map1, tree=Tree2}).
-out(#priority_queue{dict = Dict, tree = Tree}) ->
+out(#priority_queue{name=Name, map=Map, tree=Tree}) ->
case gb_trees:is_empty(Tree) of
true ->
false;
false ->
{_, {Key, Value}, Tree1} = gb_trees:take_largest(Tree),
- Dict1 = dict:erase(Key, Dict),
- {Key, Value, #priority_queue{dict = Dict1, tree = Tree1}}
+ Map1 = maps:remove(Key, Map),
+ Q = #priority_queue{name=Name, map=Map1, tree=Tree1},
+ {Key, Value, Q}
end.
size(#priority_queue{tree = Tree}) ->
gb_trees:size(Tree).
-info(#priority_queue{tree = Tree} = Q) ->
+info(#priority_queue{tree=Tree} = Q) ->
[
{size, ?MODULE:size(Q)}
| case gb_trees:is_empty(Tree) of
@@ -76,6 +115,17 @@ info(#priority_queue{tree = Tree} = Q) ->
end
].
+from_list(Orddict, #priority_queue{name=Name}) ->
+ Map = maps:from_list(Orddict),
+ Tree = gb_trees:from_orddict(Orddict),
+ #priority_queue{name=Name, map=Map, tree=Tree}.
+
+to_list(#priority_queue{tree=Tree}) ->
+ gb_trees:to_list(Tree).
+
+file_name(#priority_queue{name=Name}) ->
+ filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".queue").
+
truncate(infinity, Q) ->
Q;
truncate(Capacity, Q) when Capacity > 0 ->
@@ -83,7 +133,37 @@ truncate(Capacity, Q) when Capacity > 0 ->
truncate(Capacity, Size, Q) when Size =< Capacity ->
Q;
-truncate(Capacity, Size, #priority_queue{dict = Dict, tree = Tree}) when Size > 0 ->
+truncate(Capacity, Size, #priority_queue{name=Name, map=Map, tree=Tree}) when Size > 0 ->
{_, {Key, _}, Tree1} = gb_trees:take_smallest(Tree),
- Q1 = #priority_queue{dict = dict:erase(Key, Dict), tree = Tree1},
+ Q1 = #priority_queue{name=Name, map=maps:remove(Key, Map), tree=Tree1},
truncate(Capacity, ?MODULE:size(Q1), Q1).
+
+do_open(FilePath) ->
+ case file:read_file(FilePath) of
+ {ok, Content} ->
+ <<Vsn, Binary/binary>> = Content,
+ try parse_queue(Vsn, ?VSN, Binary) of
+ Bin ->
+ {ok, Bin}
+ catch
+ error:Reason ->
+ couch_log:error(
+ "~p Invalid queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath]),
+ file:delete(FilePath),
+ error
+ end;
+ {error, enoent} ->
+ couch_log:notice(
+ "~p (~p) Queue file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]),
+ error;
+ {error, Reason} ->
+ couch_log:error(
+ "~p Cannot read the queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath]),
+ file:delete(FilePath),
+ error
+ end.
+
+parse_queue(1, ?VSN, Binary) ->
+ erlang:binary_to_term(Binary, [safe]);
+parse_queue(Vsn, ?VSN, _) ->
+ error({unsupported_version, Vsn}).
diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 0526625..e89412f 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -46,6 +46,9 @@
% exported but for internal use.
-export([enqueue_request/2]).
+% exported for testing and debugging
+-export([get_channel/1]).
+
-ifdef(TEST).
-define(RELISTEN_DELAY, 50).
-else.
@@ -60,7 +63,7 @@
schema_channels = [],
tab,
event_listener,
- waiting = dict:new()
+ waiting=maps:new()
}).
-record(channel, {
@@ -106,6 +109,10 @@ handle_db_event(DbName, {schema_updated, DDocId}, St) ->
handle_db_event(_DbName, _Event, St) ->
{ok, St}.
+% for testing and debugging only
+get_channel(ChannelName) ->
+ gen_server:call(?MODULE, {get_channel, ChannelName}).
+
% gen_server functions.
init([]) ->
@@ -175,10 +182,10 @@ handle_call(resume, _From, State) ->
couch_log:notice("Resuming ~p", [Name]),
smoosh_channel:resume(P)
end,
- 0,
- State#state.tab
- ),
- {reply, ok, State}.
+ {reply, ok, State};
+
+handle_call({get_channel, ChannelName}, _From, #state{tab = Tab} = State) ->
+ {reply, {ok, channel_pid(Tab, ChannelName)}, State}.
handle_cast({new_db_channels, Channels}, State) ->
[
@@ -200,12 +207,12 @@ handle_cast({new_schema_channels, Channels}, State) ->
{noreply, create_missing_channels(State#state{view_channels = Channels})};
handle_cast({enqueue, Object}, State) ->
#state{waiting = Waiting} = State,
- case dict:is_key(Object, Waiting) of
+ case maps:is_key(Object, Waiting) of
true ->
{noreply, State};
false ->
{_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]),
- {noreply, State#state{waiting = dict:store(Object, Ref, Waiting)}}
+ {noreply, State#state{waiting=maps:put(Object, Ref, Waiting)}}
end.
handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
@@ -222,7 +229,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
end,
{noreply, create_missing_channels(State)};
handle_info({'DOWN', Ref, _, _, _}, State) ->
- Waiting = dict:filter(
+ Waiting = maps:filter(fun(_Key, Value) -> Value =/= Ref end,
fun(_Key, Value) -> Value =/= Ref end,
State#state.waiting
),
diff --git a/src/smoosh/test/smoosh_priority_queue_tests.erl b/src/smoosh/test/smoosh_priority_queue_tests.erl
new file mode 100644
index 0000000..668b8af
--- /dev/null
+++ b/src/smoosh/test/smoosh_priority_queue_tests.erl
@@ -0,0 +1,146 @@
+-module(smoosh_priority_queue_tests).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(PROP_PREFIX, "prop_").
+
+-define(CAPACITY, 3).
+
+-define(RANDOM_FILE, lists:flatten(io_lib:format("~p", [erlang:timestamp()]))).
+
+setup() ->
+ Ctx = test_util:start_couch(),
+ Ctx.
+
+teardown(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+smoosh_priority_queue_test_() ->
+ {
+ "smoosh priority queue test",
+ {
+ setup,
+ fun setup/0, fun teardown/1,
+ [
+ fun prop_inverse_test_/0,
+ fun no_halt_on_corrupted_file_test/0,
+ fun no_halt_on_missing_file_test/0
+ ]
+ }
+ }.
+
+%% ==========
+%% Tests
+%% ----------
+
+%% define all tests to be able to run them individually
+prop_inverse_test_() -> ?_test(begin
+ test_property(prop_inverse)
+ end).
+
+no_halt_on_corrupted_file_test() ->
+ ?_test(begin
+ Name = ?RANDOM_FILE,
+ Q0 = smoosh_priority_queue:new(Name),
+ Q = smoosh_priority_queue:open(Q0),
+ FilePath = smoosh_priority_queue:file_name(Q),
+ ok = file:write_file(FilePath, <<"garbage">>),
+ ?assertEqual(Q, smoosh_priority_queue:open(Q0)),
+ ok
+ end).
+
+no_halt_on_missing_file_test() ->
+ ?_test(begin
+ Name = ?RANDOM_FILE,
+ Q0 = smoosh_priority_queue:new(Name),
+ Q = smoosh_priority_queue:open(Q0),
+ FilePath = smoosh_priority_queue:file_name(Q),
+ ok = file:delete(FilePath),
+ ?assertEqual(Q, smoosh_priority_queue:open(Q0)),
+ ok
+ end).
+
+%% ==========
+%% Properties
+%% ----------
+
+prop_inverse() ->
+ ?FORALL(Q, queue(),
+ begin
+ List = smoosh_priority_queue:to_list(Q),
+ equal(Q, smoosh_priority_queue:from_list(List, Q))
+ end).
+
+%% ==========
+%% Generators
+%% ----------
+
+key() -> proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]).
+value() -> proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]).
+priority() -> integer().
+item() -> {key(), value(), priority()}.
+
+items_list() ->
+ ?LET(L, list(item()), L).
+
+simple_queue() ->
+ ?LET(L, items_list(),
+ from_list(L)).
+
+with_deleted() ->
+ ?LET(Q,
+ ?LET({{K0, V0, P0}, Q0}, {item(), simple_queue()},
+ smoosh_priority_queue:in(K0, V0, P0, ?CAPACITY, Q0)),
+ frequency([
+ {1, Q},
+ {2, element(3, smoosh_priority_queue:out(Q))}
+ ])).
+
+queue() ->
+ with_deleted().
+
+%% ==========================
+%% Proper related boilerplate
+%% --------------------------
+
+test_property(Property) when is_atom(Property) ->
+ test_property({atom_to_list(Property), Property});
+test_property({Id, Property}) ->
+ Name = string:sub_string(Id, length(?PROP_PREFIX) + 1),
+ Opts = [long_result, {numtests, 1000}, {to_file, user}],
+ {Name, {timeout, 60, fun() -> test_it(Property, Opts) end}}.
+
+test_it(Property, Opts) ->
+ case proper:quickcheck(?MODULE:Property(), Opts) of
+ true -> true;
+ Else -> erlang:error(
+ {propertyFailed, [
+ {module, ?MODULE},
+ {property, Property},
+ {result, Else}]})
+ end.
+
+%% ================
+%% Helper functions
+%% ----------------
+
+new() ->
+ Q = smoosh_priority_queue:new("foo"),
+ smoosh_priority_queue:open(Q).
+
+from_list(List) ->
+ lists:foldl(fun({Key, Value, Priority}, Queue) ->
+ smoosh_priority_queue:in(Key, Value, Priority, ?CAPACITY, Queue)
+ end, new(), List).
+
+equal(Q1, Q2) ->
+ out_all(Q1) =:= out_all(Q2).
+
+out_all(Q) ->
+ out_all(Q, []).
+out_all(Q0, Acc) ->
+ case smoosh_priority_queue:out(Q0) of
+ {K, V, Q1} -> out_all(Q1, [{K, V} | Acc]);
+ false -> lists:reverse(Acc)
+ end.
diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl
new file mode 100644
index 0000000..e15c928
--- /dev/null
+++ b/src/smoosh/test/smoosh_tests.erl
@@ -0,0 +1,127 @@
+-module(smoosh_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-include("couch/src/couch_db_int.hrl").
+
+-define(KILOBYTE, binary:copy(<<"x">>, 1024)).
+
+%% ==========
+%% Setup
+%% ----------
+
+setup(ChannelType) ->
+ DbName = ?tempdb(),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+ couch_db:close(Db),
+ {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+ smoosh_channel:flush(ChannelPid),
+ ok = config:set(config_section(ChannelType), "min_size", "200000", false),
+ ok = config:set("smoosh", "state_checkpoint_interval_in_sec", "1", false),
+ DbName.
+
+teardown(ChannelType, DbName) ->
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+ ok = config:delete(config_section(DbName), "min_size", false),
+ {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+ smoosh_channel:flush(ChannelPid),
+ ok.
+
+config_section(ChannelType) ->
+ "smoosh." ++ ChannelType.
+
+%% ==========
+%% Tests
+%% ----------
+
+smoosh_test_() ->
+ {
+ "Testing smoosh",
+ {
+ setup,
+ fun() -> test_util:start_couch([smoosh]) end,
+ fun test_util:stop/1,
+ [
+ persistence_tests(),
+ channels_tests()
+ ]
+ }
+ }.
+
+persistence_tests() ->
+ Tests = [
+ fun should_persist_queue/2
+ ],
+ {
+ "Should persist queue state",
+ [
+ make_test_case("ratio_dbs", Tests)
+ ]
+ }.
+
+
+channels_tests() ->
+ Tests = [
+ fun should_enqueue/2
+ ],
+ {
+ "Various channels tests",
+ [
+ make_test_case("ratio_dbs", Tests)
+ ]
+ }.
+
+make_test_case(Type, Funs) ->
+ {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}.
+
+should_enqueue("ratio_dbs" = ChannelType, DbName) ->
+ ?_test(begin
+ ok = grow_db_file(DbName, 300),
+ ok = wait_enqueue(ChannelType, DbName),
+ ?assert(is_enqueued(ChannelType, DbName)),
+ ok
+ end).
+
+should_persist_queue(ChannelType, DbName) ->
+ ?_test(begin
+ ok = grow_db_file(DbName, 300),
+ ok = wait_enqueue(ChannelType, DbName),
+ Q = channel_queue(ChannelType),
+ ok = application:stop(smoosh),
+ ok = application:start(smoosh),
+ ?assertEqual(Q, channel_queue(ChannelType)),
+ ok
+ end).
+
+grow_db_file(DbName, SizeInKb) ->
+ {ok, #db{filepath = FilePath} = Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+ {ok, Fd} = file:open(FilePath, [append]),
+ Bytes = binary:copy(?KILOBYTE, SizeInKb),
+ file:write(Fd, Bytes),
+ ok = file:close(Fd),
+ Doc = couch_doc:from_json_obj({[
+ {<<"_id">>, ?l2b(?docid())},
+ {<<"value">>, ?l2b(?docid())}
+ ]}),
+ {ok, _} = couch_db:update_docs(Db, [Doc], []),
+ couch_db:close(Db),
+ ok.
+
+is_enqueued(ChannelType, DbName) ->
+ {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+ smoosh_channel:is_key(ChannelPid, DbName).
+
+wait_enqueue(ChannelType, DbName) ->
+ test_util:wait(fun() ->
+ case is_enqueued(ChannelType, DbName) of
+ false ->
+ wait;
+ true ->
+ ok
+ end
+ end).
+
+channel_queue(ChannelType) ->
+ Q0 = smoosh_priority_queue:new(ChannelType),
+ smoosh_priority_queue:open(Q0).