You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/11/13 05:33:56 UTC
[couchdb] 04/04: smoosh optimization wip
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch optimize-smoosh
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit b7206c1f4b471b8fdfe7dc1e837fbeb508b6d5b1
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Tue Nov 8 14:30:22 2022 -0500
smoosh optimization wip
---
src/smoosh/src/smoosh_channel.erl | 689 ++++++++++++++++----------------------
src/smoosh/src/smoosh_persist.erl | 11 +-
src/smoosh/src/smoosh_server.erl | 357 +++++++++++---------
src/smoosh/test/smoosh_tests.erl | 481 ++++++++++++++++++--------
4 files changed, 854 insertions(+), 684 deletions(-)
diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl
index 50274f704..3ca515679 100644
--- a/src/smoosh/src/smoosh_channel.erl
+++ b/src/smoosh/src/smoosh_channel.erl
@@ -12,52 +12,50 @@
-module(smoosh_channel).
-behaviour(gen_server).
--vsn(1).
--include_lib("couch/include/couch_db.hrl").
% public api.
--export([start_link/1, close/1, suspend/1, resume/1, activate/1, get_status/1]).
--export([enqueue/3, last_updated/2, flush/1, is_key/2, is_activated/1]).
+-export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
+-export([enqueue/3, flush/1]).
+-export([get_status_table/1]).
% gen_server api.
-export([
init/1,
handle_call/3,
handle_cast/2,
- handle_info/2,
- terminate/2
+ handle_info/2
]).
--define(VSN, 1).
--define(CHECKPOINT_INTERVAL_IN_MSEC, 180000).
+-define(INDEX_CLEANUP, index_cleanup).
+-define(STATUS_UPDATE_INTERVAL_MSEC, 4900).
+-define(TIME_WINDOW_MSEC, 60 * 1000).
-ifndef(TEST).
--define(START_DELAY_IN_MSEC, 60000).
--define(ACTIVATE_DELAY_IN_MSEC, 30000).
+-define(CHECKPOINT_INTERVAL_MSEC, 180000).
-else.
--define(START_DELAY_IN_MSEC, 0).
--define(ACTIVATE_DELAY_IN_MSEC, 0).
--export([persist/1]).
+-define(CHECKPOINT_INTERVAL_MSEC, 1000).
-endif.
-% records.
-
-% When the state is set to activated = true, the channel has completed the state
-% recovery process that occurs on (re)start and is accepting new compaction jobs.
-% Note: if activated = false and a request for a new compaction job is received,
-% smoosh will enqueue this new job after the state recovery process has finished.
-% When the state is set to paused = false, the channel is actively compacting any
-% compaction jobs that are scheduled.
-% See operator_guide.md --> State diagram.
+% If persistence is configured, on startup the channel will try to load its
+% waiting queue from a persisted queue data file. Then, during every
+% CHECKPOINT_INTERVAL_MSEC, it will spawn an checkpointer process to write the
+% persisted queue state to disk.
-record(state, {
- active = [],
+ % Channel name
name,
+ % smoosh_persisted_queue:new() object
waiting,
+ % Paused flag. The channel starts in a the paused state.
paused = true,
- starting = [],
- activated = false,
- requests = []
+ % #{Key => Pid}
+ active = #{},
+ % #{Ref => Key}
+ starting = #{},
+ % Monitor reference of the checkpointer process
+ cref,
+ % ETS status table handle. Used to publish channel status.
+ stab
}).
% public functions.
@@ -69,19 +67,19 @@ suspend(ServerRef) ->
gen_server:call(ServerRef, suspend).
resume(ServerRef) ->
- gen_server:call(ServerRef, resume_and_activate).
-
-activate(ServerRef) ->
- gen_server:call(ServerRef, activate).
+ gen_server:call(ServerRef, resume).
enqueue(ServerRef, Object, Priority) ->
gen_server:cast(ServerRef, {enqueue, Object, Priority}).
-last_updated(ServerRef, Object) ->
- gen_server:call(ServerRef, {last_updated, Object}).
-
-get_status(ServerRef) ->
- gen_server:call(ServerRef, status).
+get_status(StatusTab) when is_reference(StatusTab) ->
+ try ets:lookup(StatusTab, status) of
+ [{status, Status}] -> Status;
+ [] -> []
+ catch
+ error:badarg ->
+ []
+ end.
close(ServerRef) ->
gen_server:call(ServerRef, close).
@@ -89,137 +87,94 @@ close(ServerRef) ->
flush(ServerRef) ->
gen_server:call(ServerRef, flush).
-is_key(ServerRef, Key) ->
- gen_server:call(ServerRef, {is_key, Key}).
-
-is_activated(ServerRef) ->
- gen_server:call(ServerRef, is_activated).
-
-% Only exported to force persistence in tests
-persist(ServerRef) ->
- gen_server:call(ServerRef, persist).
+get_status_table(ServerRef) ->
+ gen_server:call(ServerRef, get_status_table).
% gen_server functions.
init(Name) ->
- erlang:send_after(60 * 1000, self(), check_window),
process_flag(trap_exit, true),
- Waiting = smoosh_priority_queue:new(Name),
- Persist = config:get_boolean("smoosh", "persist", false),
- State =
- case smoosh_utils:is_view_channel(Name) orelse Persist =:= false of
- true ->
- schedule_unpause(),
- #state{name = Name, waiting = Waiting, paused = true, activated = true};
- false ->
- erlang:send_after(?START_DELAY_IN_MSEC, self(), start_recovery),
- #state{name = Name, waiting = Waiting, paused = true, activated = false}
- end,
- {ok, State}.
-
-handle_call({last_updated, Object}, _From, State) ->
- LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting),
- {reply, LastUpdated, State};
-handle_call(suspend, _From, State) ->
- #state{active = Active} = State,
- [
- catch erlang:suspend_process(Pid, [unless_suspending])
- || {_, Pid} <- Active
- ],
+ process_flag(message_queue_data, off_heap),
+ schedule_check_window(),
+ schedule_update_status(),
+ schedule_checkpoint(),
+ schedule_unpause(),
+ State = #state{
+ name = Name,
+ waiting = smoosh_persist:unpersist(Name),
+ stab = ets:new(smoosh_stats, [{read_concurrency, true}])
+ },
+ {ok, set_status(State)}.
+
+handle_call(get_status_table, _From, #state{} = State) ->
+ {reply, {ok, State#state.stab}, State};
+handle_call(suspend, _From, #state{active = Active} = State) ->
+ [suspend_pid(Pid) || Pid <- maps:values(Active)],
{reply, ok, State#state{paused = true}};
-handle_call(resume_and_activate, _From, State) ->
- #state{active = Active} = State,
- [catch erlang:resume_process(Pid) || {_, Pid} <- Active],
- {reply, ok, State#state{paused = false, activated = true}};
-handle_call(activate, _From, State) ->
- {reply, ok, State#state{activated = true}};
-handle_call(status, _From, State) ->
- {reply,
- {ok, [
- {active, length(State#state.active)},
- {starting, length(State#state.starting)},
- {waiting, smoosh_priority_queue:info(State#state.waiting)}
- ]},
- State};
+handle_call(resume, _From, #state{active = Active} = State) ->
+ [resume_pid(Pid) || Pid <- maps:values(Active)],
+ {reply, ok, State#state{paused = false}};
handle_call(close, _From, State) ->
{stop, normal, ok, State};
handle_call(flush, _From, #state{waiting = Q} = State) ->
- {reply, ok, State#state{waiting = smoosh_priority_queue:flush(Q)}};
-handle_call({is_key, Key}, _From, #state{waiting = Waiting} = State) ->
- {reply, smoosh_priority_queue:is_key(Key, Waiting), State};
-handle_call(is_activated, _From, #state{activated = Activated} = State0) ->
- {reply, Activated, State0};
-handle_call(persist, _From, State) ->
- persist_queue(State),
- {reply, ok, State}.
-
-handle_cast({enqueue, _Object, 0}, #state{} = State) ->
- {noreply, State};
-handle_cast({enqueue, Object, Priority}, #state{activated = true} = State) ->
- {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))};
-handle_cast({enqueue, Object, Priority}, #state{activated = false, requests = Requests} = State0) ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level(
- "~p Channel is not activated yet. Adding ~p to requests with priority ~p.", [
- ?MODULE,
- Object,
- Priority
- ]
- ),
- {noreply, State0#state{requests = [{Object, Priority} | Requests]}}.
+ State1 = State#state{waiting = smoosh_priority_queue:flush(Q)},
+ smoosh_persist:persist(State1#state.waiting, #{}, #{}),
+ {reply, ok, State1}.
+handle_cast({enqueue, Object, Priority}, #state{} = State) ->
+ State1 = add_to_queue(Object, Priority, State),
+ {noreply, maybe_start_compaction(State1)}.
+
+handle_info({'DOWN', Ref, _, _, _}, #state{cref = Ref} = State) ->
+ {noreply, State#state{cref = undefined}};
% We accept noproc here due to possibly having monitored a restarted compaction
% pid after it finished.
-handle_info({'DOWN', Ref, _, Job, Reason}, State) when
+handle_info({'DOWN', Ref, _, Job, Reason}, #state{} = State) when
Reason == normal;
Reason == noproc
->
#state{active = Active, starting = Starting} = State,
- {noreply,
- maybe_start_compaction(
- State#state{
- active = lists:keydelete(Job, 2, Active),
- starting = lists:keydelete(Ref, 1, Starting)
- }
- )};
-handle_info({'DOWN', Ref, _, Job, Reason}, State) ->
- #state{active = Active0, starting = Starting0} = State,
- case lists:keytake(Job, 2, Active0) of
- {value, {Key, _Pid}, Active1} ->
- State1 = maybe_remonitor_cpid(
- State#state{active = Active1},
- Key,
- Reason
- ),
- {noreply, maybe_start_compaction(State1)};
- false ->
- case lists:keytake(Ref, 1, Starting0) of
- {value, {_, Key}, Starting1} ->
- couch_log:warning("failed to start compaction of ~p: ~p", [
- smoosh_utils:stringify(Key),
- Reason
- ]),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
- {noreply, maybe_start_compaction(State#state{starting = Starting1})};
- false ->
+ Active1 = maps:filter(fun(_, Pid) -> Pid =/= Job end, Active),
+ Starting1 = maps:remove(Ref, Starting),
+ State1 = State#state{active = Active1, starting = Starting1},
+ {noreply, maybe_start_compaction(State1)};
+handle_info({'DOWN', Ref, _, Job, Reason}, #state{} = State) ->
+ #state{name = Name, active = Active, starting = Starting} = State,
+ FoundActive = maps:filter(fun(_, Pid) -> Pid =:= Job end, Active),
+ case maps:to_list(FoundActive) of
+ [{Key, _Pid}] ->
+ Active1 = maps:without(Key, Active),
+ State1 = State#state{active = maps:without(Key, Active1)},
+ State2 = maybe_remonitor_cpid(State1, Key, Reason),
+ {noreply, maybe_start_compaction(State2)};
+ [] ->
+ case maps:take(Ref, Starting) of
+ {Key, Starting1} ->
+ LogMsg = "~s : failed to start compaction of ~p: ~p",
+ LogArgs = [Name, smoosh_utils:stringify(Key), Reason],
+ couch_log:warning(LogMsg, LogArgs),
+ re_enqueue(Key),
+ State1 = State#state{starting = Starting1},
+ {noreply, maybe_start_compaction(State1)};
+ error ->
{noreply, State}
end
end;
-handle_info({Ref, {ok, Pid}}, State) when is_reference(Ref) ->
- case lists:keytake(Ref, 1, State#state.starting) of
- {value, {_, Key}, Starting1} ->
+% This is the '$gen_call' response handling
+handle_info({Ref, {ok, Pid}}, #state{} = State) when is_reference(Ref) ->
+ #state{name = Name, active = Active, starting = Starting} = State,
+ case maps:take(Ref, Starting) of
+ {Key, Starting1} ->
Level = smoosh_utils:log_level("compaction_log_level", "notice"),
- couch_log:Level(
- "~s: Started compaction for ~s",
- [State#state.name, smoosh_utils:stringify(Key)]
- ),
+ LogMsg = "~s: Started compaction for ~s",
+ LogArgs = [Name, smoosh_utils:stringify(Key)],
+ couch_log:Level(LogMsg, LogArgs),
erlang:monitor(process, Pid),
erlang:demonitor(Ref, [flush]),
- {noreply, State#state{
- active = [{Key, Pid} | State#state.active],
- starting = Starting1
- }};
- false ->
+ Active1 = Active#{Key => Pid},
+ State1 = State#state{active = Active1, starting = Starting1},
+ {noreply, State1};
+ error ->
{noreply, State}
end;
handle_info(check_window, State) ->
@@ -235,7 +190,7 @@ handle_info(check_window, State) ->
State;
{false, true} ->
% resume is always safe even if we did not previously suspend
- {reply, ok, NewState} = handle_call(resume_and_activate, nil, State),
+ {reply, ok, NewState} = handle_call(resume, nil, State),
NewState;
{true, false} ->
if
@@ -246,234 +201,129 @@ handle_info(check_window, State) ->
State#state{paused = true}
end
end,
- erlang:send_after(60 * 1000, self(), check_window),
+ schedule_check_window(),
{noreply, FinalState};
-handle_info(start_recovery, #state{name = Name, waiting = Waiting0} = State0) ->
- RecActive = recover(active_file_name(Name)),
- Waiting1 = lists:foldl(
- fun(DbName, Acc) ->
- case couch_db:exists(DbName) andalso couch_db:is_compacting(DbName) of
- true ->
- Priority = smoosh_server:get_priority(Name, DbName),
- smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
- false ->
- Acc
- end
- end,
- Waiting0,
- RecActive
- ),
- State1 = maybe_start_compaction(State0#state{paused = false, waiting = Waiting1}),
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level(
- "~p Previously active compaction jobs (if any) have been successfully recovered and restarted.",
- [?MODULE]
- ),
- erlang:send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate),
- {noreply, State1#state{paused = true}};
-handle_info(activate, State) ->
- {noreply, activate_channel(State)};
-handle_info(persist, State) ->
- ok = persist_and_reschedule(State),
+handle_info(update_status, #state{} = State) ->
+ schedule_update_status(),
+ {noreply, set_status(State)};
+handle_info(checkpoint, #state{cref = Ref} = State) when is_reference(Ref) ->
+ % If a checkpointer process is still running, don't start another one.
+ schedule_checkpoint(),
{noreply, State};
+handle_info(checkpoint, #state{cref = undefined} = State) ->
+ % Start an asyncronous checkpoint process so we don't block the channel
+ #state{waiting = Waiting, active = Active, starting = Starting} = State,
+ Args = [Waiting, Active, Starting],
+ {_, Ref} = spawn_monitor(smoosh_persist, persist, Args),
+ schedule_checkpoint(),
+ {noreply, State#state{cref = Ref}};
handle_info(pause, State) ->
{noreply, State#state{paused = true}};
handle_info(unpause, State) ->
{noreply, maybe_start_compaction(State#state{paused = false})}.
-terminate(_Reason, _State) ->
- ok.
-
% private functions.
-recover(FilePath) ->
- case do_recover(FilePath) of
- {ok, List} ->
- List;
- error ->
- []
- end.
-
-do_recover(FilePath) ->
- case file:read_file(FilePath) of
- {ok, Content} ->
- <<Vsn, Binary/binary>> = Content,
- try parse_state(Vsn, ?VSN, Binary) of
- Term ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level(
- "~p Successfully restored state file ~s", [?MODULE, FilePath]
- ),
- {ok, Term}
- catch
- error:Reason ->
- couch_log:error(
- "~p Invalid state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
- ),
- file:delete(FilePath),
- error
- end;
- {error, enoent} ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level(
- "~p (~p) State file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]
- ),
- error;
- {error, Reason} ->
- couch_log:error(
- "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
- ),
- file:delete(FilePath),
- error
- end.
-
-parse_state(1, ?VSN, Binary) ->
- erlang:binary_to_term(Binary, [safe]);
-parse_state(Vsn, ?VSN, _) ->
- error({unsupported_version, Vsn}).
-
-persist_and_reschedule(State) ->
- persist_queue(State),
- erlang:send_after(?CHECKPOINT_INTERVAL_IN_MSEC, self(), persist),
- ok.
-
-persist_queue(#state{name = Name, active = Active, starting = Starting, waiting = Waiting}) ->
- Active1 = lists:foldl(
- fun({DbName, _}, Acc) ->
- [DbName | Acc]
- end,
- [],
- Active
- ),
- Starting1 = lists:foldl(
- fun({_, DbName}, Acc) ->
- [DbName | Acc]
- end,
- [],
- Starting
- ),
- smoosh_utils:write_to_file(Active1, active_file_name(Name), ?VSN),
- smoosh_utils:write_to_file(Starting1, starting_file_name(Name), ?VSN),
- smoosh_priority_queue:write_to_file(Waiting).
-
-active_file_name(Name) ->
- filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active").
-
-starting_file_name(Name) ->
- filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting").
+% Periodically publish channel status to avoid having to block on gen_server
+% get_status calls.
+%
+set_status(#state{} = State) ->
+ #state{active = Active, starting = Starting, waiting = Waiting} = State,
+ Status = [
+ {active, map_size(Active)},
+ {starting, map_size(Starting)},
+ {waiting, smoosh_priority_queue:info(Waiting)}
+ ],
+ true = ets:insert(State#state.stab, {status, Status}),
+ State.
add_to_queue(Key, Priority, State) ->
- #state{active = Active, waiting = Q} = State,
- case lists:keymember(Key, 1, Active) of
+ #state{name = Name, active = Active, waiting = Q} = State,
+ case is_map_key(Key, Active) of
true ->
State;
false ->
- Capacity = list_to_integer(smoosh_utils:get(State#state.name, "capacity", "9999")),
+ Capacity = smoosh_utils:capacity(State#state.name),
Level = smoosh_utils:log_level("compaction_log_level", "notice"),
- couch_log:Level(
- "~s: adding ~p to internal compactor queue with priority ~p",
- [State#state.name, Key, Priority]
- ),
- State#state{
- waiting = smoosh_priority_queue:in(Key, Priority, Priority, Capacity, Q)
- }
+ LogMsg = "~s: enqueueing ~p to compact with priority ~p",
+ LogArgs = [Name, Key, Priority],
+ couch_log:Level(LogMsg, LogArgs),
+ Q1 = smoosh_priority_queue:in(Key, Priority, Capacity, Q),
+ State#state{waiting = Q1}
end.
-maybe_activate(#state{activated = true} = State) ->
+maybe_start_compaction(#state{paused = true} = State) ->
State;
-maybe_activate(State) ->
- activate_channel(State).
-
-activate_channel(#state{name = Name, waiting = Waiting0, requests = Requests0} = State0) ->
- RecStarting = recover(starting_file_name(Name)),
- Starting = lists:foldl(
- fun(DbName, Acc) ->
- case couch_db:exists(DbName) of
- true ->
- Priority = smoosh_server:get_priority(Name, DbName),
- smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
- false ->
- Acc
- end
- end,
- Waiting0,
- RecStarting
- ),
- Waiting1 = smoosh_priority_queue:recover(Starting),
- Requests1 = lists:reverse(Requests0),
- Waiting2 = lists:foldl(
- fun({DbName, Priority}, Acc) ->
- case couch_db:exists(DbName) of
- true ->
- smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
- false ->
- Acc
- end
- end,
- Waiting1,
- Requests1
- ),
- State1 = maybe_start_compaction(State0#state{
- waiting = Waiting2, paused = false, activated = true, requests = []
- }),
- ok = persist_and_reschedule(State1),
- schedule_unpause(),
- State1#state{paused = true}.
+maybe_start_compaction(#state{paused = false, name = Name} = State) ->
+ Concurrency = smoosh_utils:concurrency(Name),
+ maybe_start_compaction(Concurrency, State).
-maybe_start_compaction(#state{paused = true} = State) ->
+maybe_start_compaction(Concurrency, #state{active = A, starting = S} = State) when
+ map_size(A) + map_size(S) >= Concurrency
+->
State;
-maybe_start_compaction(State) ->
- Concurrency = list_to_integer(
- smoosh_utils:get(
- State#state.name,
- "concurrency",
- "1"
- )
- ),
- if
- length(State#state.active) + length(State#state.starting) < Concurrency ->
- case smoosh_priority_queue:out(State#state.waiting) of
- false ->
- maybe_activate(State);
- {Key, Priority, Q} ->
- try
- State2 =
- case start_compact(State, Key) of
- false ->
- State;
- State1 ->
- Level = smoosh_utils:log_level(
- "compaction_log_level",
- "notice"
- ),
- couch_log:Level(
- "~s: Starting compaction for ~s (priority ~p)",
- [State#state.name, smoosh_utils:stringify(Key), Priority]
- ),
- State1
- end,
- maybe_start_compaction(State2#state{waiting = Q})
- catch
- Class:Exception ->
- couch_log:warning(
- "~s: ~p ~p for ~s",
- [
- State#state.name,
- Class,
- Exception,
- smoosh_utils:stringify(Key)
- ]
- ),
- maybe_start_compaction(State#state{waiting = Q})
- end
- end;
- true ->
+maybe_start_compaction(Concurrency, #state{} = State) ->
+ case smoosh_priority_queue:out(State#state.waiting) of
+ false ->
+ State;
+ {Key, Q} ->
+ State1 = State#state{waiting = Q},
+ % Re-check priority since by the time the object was in queue, or
+ % was un-persisted after a node was down, the db or ddoc might be
+ % long gone and we don't want to crash the channel attemping to
+ % compact it
+ State2 =
+ case priority(State1, Key) of
+ 0 -> State1;
+ _ -> try_compact(State1, Key)
+ end,
+ maybe_start_compaction(Concurrency, State2)
+ end.
+
+priority(#state{name = Name}, Key) ->
+ try
+ smoosh_server:get_priority(Name, Key)
+ catch
+ Tag:Error ->
+ % We are being defensive as we don't want to crash the channel
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ LogMsg = "~s: Failed to get priority for ~s in queue ~p:~p",
+ LogArgs = [Name, smoosh_utils:stringify(Key), Tag, Error],
+ couch_log:Level(LogMsg, LogArgs),
+ 0
+ end.
+
+try_compact(#state{name = Name} = State, Key) ->
+ try start_compact(State, Key) of
+ false ->
+ State;
+ #state{} = State1 ->
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ LogMsg = "~s: Starting compaction for ~s",
+ LogArgs = [Name, smoosh_utils:stringify(Key)],
+ couch_log:Level(LogMsg, LogArgs),
+ State1
+ catch
+ Class:Exception ->
+ LogArgs = [Name, Class, Exception, smoosh_utils:stringify(Key)],
+ couch_log:warning("~s: ~p ~p for ~s", LogArgs),
State
end.
-start_compact(State, DbName) when is_list(DbName) ->
- start_compact(State, ?l2b(DbName));
-start_compact(State, DbName) when is_binary(DbName) ->
+start_compact(#state{} = State, {?INDEX_CLEANUP, DbName} = Key) ->
+ #state{name = Name, active = Active} = State,
+ case smoosh_utils:ignore(DbName) of
+ false ->
+ {Pid, _Ref} = spawn_monitor(fun() -> cleanup_index_files(DbName) end),
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ LogMsg = "~s: Starting index cleanup for ~s",
+ LogArgs = [Name, smoosh_utils:stringify(Key)],
+ couch_log:Level(LogMsg, LogArgs),
+ State#state{active = Active#{Key => Pid}};
+ _ ->
+ false
+ end;
+start_compact(#state{name = Name} = State, DbName) when is_binary(DbName) ->
case couch_db:open_int(DbName, []) of
{ok, Db} ->
try
@@ -482,94 +332,139 @@ start_compact(State, DbName) when is_binary(DbName) ->
couch_db:close(Db)
end;
Error = {not_found, no_db_file} ->
- couch_log:warning(
- "Error starting compaction for ~p: ~p",
- [smoosh_utils:stringify(DbName), Error]
- ),
+ LogMsg = "~s : Error starting compaction for ~p: ~p",
+ LogArgs = [Name, smoosh_utils:stringify(DbName), Error],
+ couch_log:warnign(LogMsg, LogArgs),
false
end;
-start_compact(State, {Shard, GroupId}) ->
+start_compact(#state{starting = Starting} = State, {Shard, GroupId}) ->
case smoosh_utils:ignore_db({Shard, GroupId}) of
false ->
- DbName = mem3:dbname(Shard),
{ok, Pid} = couch_index_server:get_index(
couch_mrview_index, Shard, GroupId
),
- spawn(fun() -> cleanup_index_files(DbName, Shard) end),
+ schedule_cleanup_index_files(Shard),
Ref = erlang:monitor(process, Pid),
Pid ! {'$gen_call', {self(), Ref}, compact},
- State#state{starting = [{Ref, {Shard, GroupId}} | State#state.starting]};
+ State#state{starting = Starting#{Ref => {Shard, GroupId}}};
_ ->
false
end;
-start_compact(State, Db) ->
- case smoosh_utils:ignore_db(Db) of
+start_compact(#state{} = State, Db) ->
+ #state{name = Name, starting = Starting, active = Active} = State,
+ Key = couch_db:name(Db),
+ case smoosh_utils:ignore_db(Key) of
false ->
DbPid = couch_db:get_pid(Db),
- Key = couch_db:name(Db),
case couch_db:get_compactor_pid(Db) of
nil ->
Ref = erlang:monitor(process, DbPid),
DbPid ! {'$gen_call', {self(), Ref}, start_compact},
- State#state{starting = [{Ref, Key} | State#state.starting]};
+ State#state{starting = Starting#{Ref => Key}};
% Compaction is already running, so monitor existing compaction pid.
- CPid ->
- Level = smoosh_utils:log_level("compaction_log_level", "notice"),
- couch_log:Level(
- "Db ~s continuing compaction",
- [smoosh_utils:stringify(Key)]
- ),
+ CPid when is_pid(CPid) ->
erlang:monitor(process, CPid),
- State#state{active = [{Key, CPid} | State#state.active]}
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ LogMsg = "~s : db ~s continuing compaction",
+ LogArgs = [Name, smoosh_utils:stringify(Key)],
+ couch_log:Level(LogMsg, LogArgs),
+ State#state{active = Active#{Key => CPid}}
end;
_ ->
false
end.
-maybe_remonitor_cpid(State, DbName, Reason) when is_binary(DbName) ->
+maybe_remonitor_cpid(#state{} = State, DbName, Reason) when is_binary(DbName) ->
+ #state{name = Name, active = Active} = State,
case couch_db:open_int(DbName, []) of
{ok, Db} ->
case couch_db:get_compactor_pid_sync(Db) of
nil ->
- couch_log:warning(
- "exit for compaction of ~p: ~p",
- [smoosh_utils:stringify(DbName), Reason]
- ),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [DbName]),
+ LogMsg = "~s : exit for compaction of ~p: ~p",
+ LogArgs = [Name, smoosh_utils:stringify(DbName), Reason],
+ couch_log:warning(LogMsg, LogArgs),
+ re_enqueue(DbName),
State;
- CPid ->
- Level = smoosh_utils:log_level("compaction_log_level", "notice"),
- couch_log:Level(
- "~s compaction already running. Re-monitor Pid ~p",
- [smoosh_utils:stringify(DbName), CPid]
- ),
+ CPid when is_pid(CPid) ->
erlang:monitor(process, CPid),
- State#state{active = [{DbName, CPid} | State#state.active]}
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ LogMsg = "~s: ~s compaction already running. Re-monitor Pid ~p",
+ LogArgs = [Name, smoosh_utils:stringify(DbName), CPid],
+ couch_log:Level(LogMsg, LogArgs),
+ State#state{active = Active#{DbName => CPid}}
end;
Error = {not_found, no_db_file} ->
- couch_log:warning(
- "exit for compaction of ~p: ~p",
- [smoosh_utils:stringify(DbName), Error]
- ),
+ LogMsg = "~s : exit for compaction of ~p: ~p",
+ LogArgs = [Name, smoosh_utils:stringify(DbName), Error],
+ couch_log:warning(LogMsg, LogArgs),
State
end;
% not a database compaction, so ignore the pid check
-maybe_remonitor_cpid(State, Key, Reason) ->
- couch_log:warning(
- "exit for compaction of ~p: ~p",
- [smoosh_utils:stringify(Key), Reason]
- ),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
+maybe_remonitor_cpid(#state{name = Name} = State, Key, Reason) ->
+ LogMsg = "~s: exit for compaction of ~p: ~p",
+ LogArgs = [Name, smoosh_utils:stringify(Key), Reason],
+ couch_log:warning(LogMsg, LogArgs),
+ re_enqueue(Key),
State.
+schedule_check_window() ->
+ erlang:send_after(?TIME_WINDOW_MSEC, self(), check_window).
+
+schedule_update_status() ->
+ erlang:send_after(?STATUS_UPDATE_INTERVAL_MSEC, self(), update_status).
+
schedule_unpause() ->
- WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")),
+ WaitSecs = config:get_integer("smoosh", "wait_secs", 30),
erlang:send_after(WaitSecs * 1000, self(), unpause).
-cleanup_index_files(DbName, _Shard) ->
- case config:get("smoosh", "cleanup_index_files", "false") of
- "true" ->
- fabric:cleanup_index_files(DbName);
+schedule_checkpoint() ->
+ erlang:send_after(?CHECKPOINT_INTERVAL_MSEC, self(), checkpoint).
+
+re_enqueue(Obj) ->
+ case whereis(smoosh_server) of
+ Pid when is_pid(Pid) ->
+ erlang:send_after(5000, Pid, {'$gen_cast', enqueue, Obj}),
+ ok;
_ ->
ok
end.
+
+cleanup_index_files(DbName) ->
+ case should_clean_up_indices() of
+ true -> fabric:cleanup_index_files(DbName);
+ false -> ok
+ end.
+
+schedule_cleanup_index_files(Shard) ->
+ case should_clean_up_indices() of
+ true ->
+ % Since cleanup is at the cluster level, schedule it with a chance
+ % inversely proportional to the number of local shards
+ DbName = mem3:dbname(Shard),
+ try length(mem3:local_shards(DbName)) of
+ ShardCount when ShardCount >= 1 ->
+ case rand:uniform() < (1 / ShardCount) of
+ true ->
+ Arg = {?INDEX_CLEANUP, DbName},
+ smoosh_server:enqueue(Arg);
+ false ->
+ ok
+ end;
+ _ ->
+ ok
+ catch
+ error:database_does_not_exist ->
+ ok
+ end;
+ false ->
+ ok
+ end.
+
+should_clean_up_indices() ->
+ config:get_boolean("smoosh", "cleanup_index_files", true).
+
+suspend_pid(Pid) when is_pid(Pid) ->
+ catch erlang:suspend_process(Pid, [unless_suspending]).
+
+resume_pid(Pid) when is_pid(Pid) ->
+ catch erlang:resume_process(Pid).
diff --git a/src/smoosh/src/smoosh_persist.erl b/src/smoosh/src/smoosh_persist.erl
index 3b5418b04..b3ac1c43d 100644
--- a/src/smoosh/src/smoosh_persist.erl
+++ b/src/smoosh/src/smoosh_persist.erl
@@ -93,12 +93,12 @@ check_setup(true) ->
ok -> ok;
{error, Error2} -> throw({fail, "write", Error2})
end,
- couch_util:file_delete(Path).
+ delete_file(Path).
write(#{} = QData, Path) when is_list(Path), map_size(QData) == 0 ->
% Save a few bytes by deleting the persisted queue data if
% there are no waiting/starting or active jobs
- couch_util:file_delete(Path);
+ delete_file(Path);
write(#{} = QData, Path) when is_list(Path) ->
Bin = term_to_binary(QData, [compressed, {minor_version, 2}]),
TmpPath = tmp_path(Path),
@@ -133,6 +133,13 @@ state_dir() ->
Dir = config:get("smoosh", "state_dir", "."),
filename:absname(Dir).
+delete_file(Path) ->
+ % On Erlang 24+ we can avoid using the file server
+ case erlang:function_exported(file, delete, 2) of
+ true -> file:delete(Path, [raw]);
+ false -> file:delete(Path)
+ end.
+
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 27277be04..215277b42 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -12,27 +12,25 @@
-module(smoosh_server).
-behaviour(gen_server).
--vsn(4).
-behaviour(config_listener).
--include_lib("couch/include/couch_db.hrl").
+%-include_lib("couch/include/couch_db.hrl").
% public api.
-export([
start_link/0,
suspend/0,
resume/0,
+ flush/0,
enqueue/1,
sync_enqueue/1,
- sync_enqueue/2,
handle_db_event/3,
status/0
]).
--define(SECONDS_PER_MINUTE, 60).
-
% gen_server api.
-export([
init/1,
+ handle_continue/2,
handle_call/3,
handle_cast/2,
handle_info/2,
@@ -46,30 +44,39 @@
% exported but for internal use.
-export([enqueue_request/2]).
-export([get_priority/2]).
-
-% exported for testing and debugging
--export([get_channel/1]).
+-export([access_cleaner/0]).
-ifdef(TEST).
+-define(STALENESS_MIN, 0).
+-define(ACCEESS_CLEAN_INTERVAL_MSEC, 300).
-define(RELISTEN_DELAY, 50).
-else.
+-define(STALENESS_MIN, 5).
+-define(ACCEESS_CLEAN_INTERVAL_MSEC, 3000).
-define(RELISTEN_DELAY, 5000).
-endif.
+-define(INDEX_CLEANUP, index_cleanup).
+-define(ACCESS, smoosh_access).
+-define(ACCESS_MAX_SIZE, 1000000).
+-define(ACCESS_NEVER, -1 bsl 58).
+
% private records.
-record(state, {
db_channels = [],
view_channels = [],
- tab,
+ cleanup_channels = [],
event_listener,
waiting = #{},
- waiting_by_ref = #{}
+ waiting_by_ref = #{},
+ access_cleaner
}).
-record(channel, {
name,
- pid
+ pid,
+ stab
}).
% public functions.
@@ -78,22 +85,35 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
suspend() ->
- gen_server:call(?MODULE, suspend).
+ gen_server:call(?MODULE, suspend, infinity).
resume() ->
- gen_server:call(?MODULE, resume).
+ gen_server:call(?MODULE, resume, infinity).
-status() ->
- gen_server:call(?MODULE, status).
+flush() ->
+ gen_server:call(?MODULE, flush, infinity).
-enqueue(Object) ->
- gen_server:cast(?MODULE, {enqueue, Object}).
+status() ->
+ try ets:foldl(fun get_channel_status/2, [], ?MODULE) of
+ Res -> {ok, Res}
+ catch
+ error:badarg ->
+ {ok, []}
+ end.
-sync_enqueue(Object) ->
- gen_server:call(?MODULE, {enqueue, Object}).
+enqueue(Object0) ->
+ Object = smoosh_utils:validate_arg(Object0),
+ case stale_enough(Object) of
+ true -> gen_server:cast(?MODULE, {enqueue, Object});
+ false -> ok
+ end.
-sync_enqueue(Object, Timeout) ->
- gen_server:call(?MODULE, {enqueue, Object}, Timeout).
+sync_enqueue(Object0) ->
+ Object = smoosh_utils:validate_arg(Object0),
+ case stale_enough(Object) of
+ true -> gen_server:call(?MODULE, {enqueue, Object}, infinity);
+ false -> ok
+ end.
handle_db_event(DbName, local_updated, St) ->
enqueue(DbName),
@@ -110,35 +130,29 @@ handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) ->
handle_db_event(_DbName, _Event, St) ->
{ok, St}.
-% for testing and debugging only
-get_channel(ChannelName) ->
- gen_server:call(?MODULE, {get_channel, ChannelName}).
-
% gen_server functions.
init([]) ->
process_flag(trap_exit, true),
+ process_flag(message_queue_data, off_heap),
ok = config:listen_for_changes(?MODULE, nil),
- {ok, Pid} = start_event_listener(),
- DbChannels = smoosh_utils:split(
- config:get("smoosh", "db_channels", "upgrade_dbs,ratio_dbs,slack_dbs")
- ),
- ViewChannels = smoosh_utils:split(
- config:get("smoosh", "view_channels", "upgrade_views,ratio_views,slack_views")
- ),
- Tab = ets:new(channels, [{keypos, #channel.name}]),
- {ok,
- create_missing_channels(#state{
- db_channels = DbChannels,
- view_channels = ViewChannels,
- event_listener = Pid,
- tab = Tab
- })}.
-
-handle_config_change("smoosh", "db_channels", L, _, _) ->
- {ok, gen_server:cast(?MODULE, {new_db_channels, smoosh_utils:split(L)})};
-handle_config_change("smoosh", "view_channels", L, _, _) ->
- {ok, gen_server:cast(?MODULE, {new_view_channels, smoosh_utils:split(L)})};
+ Opts = [named_table, {read_concurrency, true}],
+ ets:new(?MODULE, Opts ++ [{keypos, #channel.name}]),
+ ets:new(?ACCESS, Opts ++ [{write_concurrency, true}, public]),
+ State = #state{
+ access_cleaner = spawn_link(?MODULE, access_cleaner, []),
+ db_channels = smoosh_utils:db_channels(),
+ view_channels = smoosh_utils:view_channels(),
+ cleanup_channels = smoosh_utils:cleanup_channels()
+ },
+ {ok, State, {continue, create_channels}}.
+
+handle_config_change("smoosh", "db_channels", _, _, _) ->
+ {ok, gen_server:cast(?MODULE, new_db_channels)};
+handle_config_change("smoosh", "view_channels", _, _, _) ->
+ {ok, gen_server:cast(?MODULE, new_view_channels)};
+handle_config_change("smoosh", "cleanup_channels", _, _, _) ->
+ {ok, gen_server:cast(?MODULE, new_cleanup_channels)};
handle_config_change(_, _, _, _, _) ->
{ok, nil}.
@@ -151,51 +165,58 @@ handle_config_terminate(_Server, _Reason, _State) ->
restart_config_listener
).
-handle_call(status, _From, State) ->
- Acc = ets:foldl(fun get_channel_status/2, [], State#state.tab),
- {reply, {ok, Acc}, State};
+handle_continue(create_channels, #state{} = State) ->
+ % Warn users about smoosh persistence misconfiguration issues. Do it once
+ % on startup to avoid continuously spamming logs with errors.
+ smoosh_persist:check_setup(),
+ State1 = create_missing_channels(State),
+ {ok, Pid} = start_event_listener(),
+ {noreply, State1#state{event_listener = Pid}}.
+
handle_call({enqueue, Object}, _From, State) ->
{noreply, NewState} = handle_cast({enqueue, Object}, State),
{reply, ok, NewState};
handle_call(suspend, _From, State) ->
- ets:foldl(
- fun(#channel{name = Name, pid = P}, _) ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level("Suspending ~p", [Name]),
- smoosh_channel:suspend(P)
- end,
- 0,
- State#state.tab
- ),
+ Fun = fun(#channel{name = Name, pid = P}, _) ->
+ Level = smoosh_utils:log_level("compaction_log_level", "debug"),
+ couch_log:Level("Suspending ~p", [Name]),
+ smoosh_channel:suspend(P)
+ end,
+ ets:foldl(Fun, ok, ?MODULE),
{reply, ok, State};
handle_call(resume, _From, State) ->
- ets:foldl(
- fun(#channel{name = Name, pid = P}, _) ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level("Resuming ~p", [Name]),
- smoosh_channel:resume(P)
- end,
- 0,
- State#state.tab
- ),
+ Fun = fun(#channel{name = Name, pid = P}, _) ->
+ Level = smoosh_utils:log_level("compaction_log_level", "debug"),
+ couch_log:Level("Resuming ~p", [Name]),
+ smoosh_channel:resume(P)
+ end,
+ ets:foldl(Fun, ok, ?MODULE),
{reply, ok, State};
-handle_call({get_channel, ChannelName}, _From, #state{tab = Tab} = State) ->
- {reply, {ok, channel_pid(Tab, ChannelName)}, State}.
-
-handle_cast({new_db_channels, Channels}, State) ->
- [
- smoosh_channel:close(channel_pid(State#state.tab, C))
- || C <- State#state.db_channels -- Channels
- ],
- {noreply, create_missing_channels(State#state{db_channels = Channels})};
-handle_cast({new_view_channels, Channels}, State) ->
- [
- smoosh_channel:close(channel_pid(State#state.tab, C))
- || C <- State#state.view_channels -- Channels
- ],
- {noreply, create_missing_channels(State#state{view_channels = Channels})};
+handle_call(flush, _From, State) ->
+ Fun = fun(#channel{pid = P}, _) -> smoosh_channel:flush(P) end,
+ ets:foldl(Fun, ok, ?MODULE),
+ {reply, ok, State}.
+
+handle_cast(new_db_channels, #state{} = State) ->
+ Channels = smoosh_utils:db_channels(),
+ Closed = State#state.db_channels -- Channels,
+ [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+ State1 = State#state{db_channels = Channels},
+ {noreply, create_missing_channels(State1)};
+handle_cast(new_view_channels, #state{} = State) ->
+ Channels = smoosh_utils:view_channels(),
+ Closed = State#state.view_channels -- Channels,
+ [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+ State1 = State#state{view_channels = Channels},
+ {noreply, create_missing_channels(State1)};
+handle_cast(new_cleanup_channels, #state{} = State) ->
+ Channels = smoosh_utils:cleanup_channels(),
+ Closed = State#state.cleanup_channels -- Channels,
+ [smoosh_channel:close(channel_pid(C)) || C <- Closed],
+ State1 = State#state{cleanup_channels = Channels},
+ {noreply, create_missing_channels(State1)};
handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) ->
- case maps:is_key(Object, Waiting) of
+ case is_map_key(Object, Waiting) of
true ->
{noreply, State};
false ->
@@ -208,12 +229,17 @@ handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
couch_log:Level("update notifier died ~p", [Reason]),
{ok, Pid1} = start_event_listener(),
{noreply, State#state{event_listener = Pid1}};
+handle_info({'EXIT', Pid, Reason}, #state{access_cleaner = Pid} = State) ->
+ Level = smoosh_utils:log_level("compaction_log_level", "notice"),
+ couch_log:Level("access cleaner died ~p", [Reason]),
+ Pid1 = spawn_link(?MODULE, access_cleaner, []),
+ {noreply, State#state{access_cleaner = Pid1}};
handle_info({'EXIT', Pid, Reason}, State) ->
Level = smoosh_utils:log_level("compaction_log_level", "notice"),
couch_log:Level("~p ~p died ~p", [?MODULE, Pid, Reason]),
- case ets:match_object(State#state.tab, #channel{pid = Pid, _ = '_'}) of
+ case ets:match_object(?MODULE, #channel{pid = Pid, _ = '_'}) of
[#channel{name = Name}] ->
- ets:delete(State#state.tab, Name);
+ ets:delete(?MODULE, Name);
_ ->
ok
end,
@@ -226,13 +252,13 @@ handle_info(restart_config_listener, State) ->
handle_info(_Msg, State) ->
{noreply, State}.
-terminate(_Reason, State) ->
- ets:foldl(
- fun(#channel{pid = P}, _) -> smoosh_channel:close(P) end,
- 0,
- State#state.tab
- ),
- ok.
+terminate(_Reason, #state{access_cleaner = CPid}) ->
+ catch unlink(CPid),
+ exit(CPid, kill),
+ Fun = fun(#channel{pid = P}, _) ->
+ smoosh_channel:close(P)
+ end,
+ ets:foldl(Fun, ok, ?MODULE).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -251,18 +277,11 @@ remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) ->
{Ref, Waiting1} = maps:take(Object, Waiting),
State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
-get_channel_status(#channel{name = Name, pid = P}, Acc0) when is_pid(P) ->
- try gen_server:call(P, status) of
- {ok, Status} ->
- [{Name, Status} | Acc0];
- _ ->
- Acc0
- catch
- _:_ ->
- Acc0
- end;
-get_channel_status(_, Acc0) ->
- Acc0.
+get_channel_status(#channel{name = Name, stab = Tab}, Acc) ->
+ Status = smoosh_channel:get_status(Tab),
+ [{Name, Status} | Acc];
+get_channel_status(_, Acc) ->
+ Acc.
start_event_listener() ->
couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]).
@@ -273,79 +292,92 @@ enqueue_request(State, Object) ->
false ->
ok;
{ok, Pid, Priority} ->
+ case ets:info(?ACCESS, size) of
+ Size when Size =< ?ACCESS_MAX_SIZE ->
+ Now = erlang:monotonic_time(second),
+ true = ets:insert(?ACCESS, {Object, Now});
+ _ ->
+ ok
+ end,
smoosh_channel:enqueue(Pid, Object, Priority)
end
catch
- Class:Exception:Stack ->
- couch_log:warning(
- "~s: ~p ~p for ~s : ~p",
- [
- ?MODULE,
- Class,
- Exception,
- smoosh_utils:stringify(Object),
- Stack
- ]
- )
+ Tag:Exception:Stack ->
+ Args = [?MODULE, Tag, Exception, smoosh_utils:stringify(Object), Stack],
+ couch_log:warning("~s: ~p ~p for ~s : ~p", Args),
+ ok
end.
-find_channel(#state{} = State, {Shard, GroupId}) ->
- find_channel(State#state.tab, State#state.view_channels, {Shard, GroupId});
+find_channel(#state{} = State, {?INDEX_CLEANUP, DbName}) ->
+ find_channel(State, State#state.cleanup_channels, {?INDEX_CLEANUP, DbName});
+find_channel(#state{} = State, {Shard, GroupId}) when is_binary(Shard) ->
+ find_channel(State, State#state.view_channels, {Shard, GroupId});
find_channel(#state{} = State, DbName) ->
- find_channel(State#state.tab, State#state.db_channels, DbName).
+ find_channel(State, State#state.db_channels, DbName).
-find_channel(_Tab, [], _Object) ->
+find_channel(#state{} = _State, [], _Object) ->
false;
-find_channel(Tab, [Channel | Rest], Object) ->
- Pid = channel_pid(Tab, Channel),
- LastUpdated = smoosh_channel:last_updated(Pid, Object),
- StalenessInSec =
- config:get_integer("smoosh", "staleness", 5) *
- ?SECONDS_PER_MINUTE,
- Staleness = erlang:convert_time_unit(StalenessInSec, seconds, native),
- Now = erlang:monotonic_time(),
- Activated = smoosh_channel:is_activated(Pid),
- StaleEnough = LastUpdated =:= false orelse Now - LastUpdated > Staleness,
- case Activated andalso StaleEnough of
+find_channel(#state{} = State, [Channel | Rest], Object) ->
+ case stale_enough(Object) of
true ->
case smoosh_utils:ignore_db(Object) of
true ->
- find_channel(Tab, Rest, Object);
+ find_channel(State, Rest, Object);
_ ->
case get_priority(Channel, Object) of
0 ->
- find_channel(Tab, Rest, Object);
+ find_channel(State, Rest, Object);
Priority ->
- {ok, Pid, Priority}
+ {ok, channel_pid(Channel), Priority}
end
end;
false ->
- find_channel(Tab, Rest, Object)
+ find_channel(State, Rest, Object)
end.
-channel_pid(Tab, Channel) ->
- [#channel{pid = Pid}] = ets:lookup(Tab, Channel),
+stale_enough(Object) ->
+ LastUpdatedSec = last_updated(Object),
+ Staleness = erlang:monotonic_time(second) - LastUpdatedSec,
+ Staleness > min_staleness_sec().
+
+channel_pid(Channel) ->
+ [#channel{pid = Pid}] = ets:lookup(?MODULE, Channel),
Pid.
-create_missing_channels(State) ->
- create_missing_channels(State#state.tab, State#state.db_channels),
- create_missing_channels(State#state.tab, State#state.view_channels),
+create_missing_channels(#state{} = State) ->
+ create_missing_channels_type(State#state.db_channels),
+ create_missing_channels_type(State#state.view_channels),
+ create_missing_channels_type(State#state.cleanup_channels),
State.
-create_missing_channels(_Tab, []) ->
+create_missing_channels_type([]) ->
ok;
-create_missing_channels(Tab, [Channel | Rest]) ->
- case ets:lookup(Tab, Channel) of
+create_missing_channels_type([Channel | Rest]) ->
+ case ets:lookup(?MODULE, Channel) of
[] ->
{ok, Pid} = smoosh_channel:start_link(Channel),
- true = ets:insert(Tab, [#channel{name = Channel, pid = Pid}]);
+ {ok, STab} = smoosh_channel:get_status_table(Pid),
+ Chan = #channel{
+ name = Channel,
+ pid = Pid,
+ stab = STab
+ },
+ true = ets:insert(?MODULE, Chan);
_ ->
ok
end,
- create_missing_channels(Tab, Rest).
+ create_missing_channels_type(Rest).
+get_priority(_Channel, {?INDEX_CLEANUP, DbName}) ->
+ try mem3:local_shards(mem3:dbname(DbName)) of
+ [_ | _] -> 1;
+ [] -> 0
+ catch
+ error:database_does_not_exist ->
+ 0
+ end;
get_priority(Channel, {Shard, GroupId}) ->
- case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
+ try couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
{ok, Pid} ->
try
{ok, ViewInfo} = couch_index:get_info(Pid),
@@ -368,9 +400,10 @@ get_priority(Channel, {Shard, GroupId}) ->
[Channel, Shard, GroupId, Reason]
),
0
+ catch
+ throw:{not_found, _} ->
+ 0
end;
-get_priority(Channel, DbName) when is_list(DbName) ->
- get_priority(Channel, ?l2b(DbName));
get_priority(Channel, DbName) when is_binary(DbName) ->
case couch_db:open_int(DbName, []) of
{ok, Db} ->
@@ -379,11 +412,8 @@ get_priority(Channel, DbName) when is_binary(DbName) ->
after
couch_db:close(Db)
end;
- Error = {not_found, no_db_file} ->
- couch_log:warning(
- "~p: Error getting priority for ~p: ~p",
- [Channel, DbName, Error]
- ),
+ {not_found, no_db_file} ->
+ % It's expected that a db might be deleted while waiting in queue
0
end;
get_priority(Channel, Db) ->
@@ -494,6 +524,31 @@ view_needs_upgrade(Props) ->
Enabled andalso length(Versions) >= 2
end.
+access_cleaner() ->
+ JitterMSec = rand:uniform(?ACCEESS_CLEAN_INTERVAL_MSEC),
+ timer:sleep(?ACCEESS_CLEAN_INTERVAL_MSEC + JitterMSec),
+ NowSec = erlang:monotonic_time(second),
+ Limit = NowSec - min_staleness_sec(),
+ Head = {'_', '$1'},
+ Guard = {'<', '$1', Limit},
+ ets:select_delete(?ACCESS, [{Head, [Guard], [true]}]),
+ access_cleaner().
+
+min_staleness_sec() ->
+ Min = config:get_integer("smoosh", "staleness", ?STALENESS_MIN),
+ Min * 60.
+
+last_updated(Object) ->
+ try ets:lookup(?ACCESS, Object) of
+ [{_, AccessSec}] ->
+ AccessSec;
+ [] ->
+ ?ACCESS_NEVER
+ catch
+ error:badarg ->
+ 0
+ end.
+
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl
index 73876888b..ee1d66992 100644
--- a/src/smoosh/test/smoosh_tests.erl
+++ b/src/smoosh/test/smoosh_tests.erl
@@ -3,152 +3,365 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-%% ==========
-%% Setup
-%% ----------
+smoosh_test_() ->
+ {
+ setup,
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ %% ?TDEF_FE(t_default_channels),
+ %% ?TDEF_FE(t_channels_recreated_on_crash),
+ %% ?TDEF_FE(t_can_create_and_delete_channels),
+ ?TDEF_FE(t_db_is_enqueued, 20),
+ ?TDEF_FE(t_x)
+ ]
+ }
+ }.
+
+setup_all() ->
+ meck:new(smoosh_server, [passthrough]),
+ meck:new(smoosh_channel, [passthrough]),
+ Ctx = test_util:start_couch([fabric]),
+ config:set("query_server_config", "commit_freq", "0", false),
+ Ctx.
+
+teardown_all(Ctx) ->
+ catch application:stop(smoosh),
+ config:delete("query_server_config", "commit_freq", false),
+ test_util:stop(Ctx),
+ meck:unload().
-setup(ChannelType) ->
+setup() ->
+ config:set("smoosh", "persist", "true", false),
+ config:set("smoosh.ratio_dbs", "min_size", "1", false),
+ config:set("smoosh.ratio_dbs", "min_priority", "1", false),
DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- couch_db:close(Db),
- {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
- smoosh_channel:flush(ChannelPid),
- ok = config:set("smoosh", "persist", "true", false),
- ok = config:set(config_section(ChannelType), "min_size", "1", false),
- ok = config:set(config_section(ChannelType), "min_priority", "1", false),
+ fabric:create_db(DbName, [{q, 1}]),
+ {ok, _} = create_ddoc(DbName, <<"_design/foo">>, <<"bar">>),
+ {ok, _} = create_doc(DbName, <<"doc1">>, 10000000),
+ {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>),
+ %{ok, _} = update_ddoc(DbName, <<"_design/foo">>, <<"bar">>),
+ %{ok, _} = delete_doc(DbName, <<"doc1">>),
+ %{ok, _} = fabric:query_view(DbName, <<"boo">>, <<"bar">>),
+ application:start(smoosh),
DbName.
-teardown(ChannelType, DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]),
- ok = config:delete("smoosh", "persist", false),
- ok = config:delete(config_section(DbName), "min_size", false),
- ok = config:delete(config_section(DbName), "min_priority", false),
- meck:unload(),
- {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
- smoosh_channel:flush(ChannelPid),
+teardown(DbName) ->
+ flush(),
+ application:stop(smoosh),
+ fabric:delete_db(DbName),
+ meck:reset(smoosh_server),
+ meck:reset(smoosh_channel),
+ config:delete("smoosh", "db_channels", false),
+ config:delete("smoosh", "view_channels", false),
+ config:delete("smoosh", "cleanup_channels", false),
+ config:delete("smoosh", "persist", false),
+ config:delete("smoosh.ratio_dbs", "min_size", false),
+ config:delete("smoosh.ratio_dbs", "min_priority", false).
+
+t_default_channels(_) ->
+ % 3 default ratios + 3 default views + 1 index cleanup = 7
+ wait_for_channels(7),
+ ?assertMatch(
+ [
+ {"index_cleanup", _},
+ {"ratio_dbs", _},
+ {"ratio_views", _},
+ {"slack_dbs", _},
+ {"slack_views", _},
+ {"upgrade_dbs", _},
+ {"upgrade_views", _}
+ ],
+ status()
+ ).
+
+t_channels_recreated_on_crash(_) ->
+ wait_for_channels(7),
+ flush(),
+ RatioDbsPid = get_channel_pid("ratio_dbs"),
+ meck:reset(smoosh_channel),
+ exit(RatioDbsPid, kill),
+ meck:wait(1, smoosh_channel, start_link, 1, 3000),
+ wait_for_channels(7),
+ ?assertMatch([_, {"ratio_dbs", _} | _], status()),
+ ?assertNotEqual(RatioDbsPid, get_channel_pid("ratio_dbs")).
+
+t_can_create_and_delete_channels(_) ->
+ wait_for_channels(7),
+ flush(),
+ config:set("smoosh", "db_channels", "mychan1", false),
+ config:set("smoosh", "view_channels", "mychan2", false),
+ config:set("smoosh", "cleanup_channels", "mychan3", false),
+ wait_for_channels(10),
+ meck:reset(smoosh_channel),
+ config:delete("smoosh", "db_channels", false),
+ config:delete("smoosh", "view_channels", false),
+ config:delete("smoosh", "cleanup_channels", false),
+ wait_for_channels(7).
+
+t_db_is_enqueued(DbName) ->
+ wait_for_channels(7),
+ flush(),
+ meck:reset(smoosh_channel),
+ ?debugFmt("~n DELETING doc1: ~p ~n", [fabric:get_db_info(DbName)]),
+ timer:sleep(2000),
+ {ok, _} = delete_doc(DbName, <<"doc1">>),
+ ?debugFmt("~n DELETED doc1: ~p ~n", [fabric:get_db_info(DbName)]),
+ ok = wait_to_enqueue(DbName),
+ %ok = wait_compact_start(),
+ ok = wait_compact_done(),
+ timer:sleep(4000),
+ ?debugFmt("~n COMPACTED doc1: ~p ~n", [fabric:get_db_info(DbName)]),
+ ?debugVal(meck:capture(last, smoosh_channel, handle_info, 2, 1)),
ok.
-config_section(ChannelType) ->
- "smoosh." ++ ChannelType.
+t_x(DbName) ->
+ ok.
+
+%% t_smoosh2(DbName) ->
+%% io:format(standard_error, "~n +++++ ~p:~p@~B 2~n", [?MODULE, ?FUNCTION_NAME, ?LINE]),
+%% ?debugVal(DbName).
%% ==========
-%% Tests
+%% Setup
%% ----------
-smoosh_test_() ->
- {
- "Testing smoosh",
- {
- setup,
- fun() -> test_util:start_couch([smoosh]) end,
- fun test_util:stop/1,
- [
- channels_tests(),
- persistence_tests()
- ]
- }
- }.
+%% setup(ChannelType) ->
+%% DbName = ?tempdb(),
+%% {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+%% couch_db:close(Db),
+%% {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+%% smoosh_channel:flush(ChannelPid),
+%% ok = config:set("smoosh", "persist", "true", false),
+%% ok = config:set("smoosh.ratio_dbs", "min_size", "1", false),
+%% ok = config:set("smoosh.ratio_dbs", "min_priority", "1", false),
+%% DbName.
-persistence_tests() ->
- Tests = [
- fun should_persist_queue/2,
- fun should_call_recover/2,
- fun should_not_call_recover/2
- ],
- {
- "Various persistence tests",
- [
- make_test_case("ratio_dbs", Tests)
- ]
- }.
+%% teardown(ChannelType, DbName) ->
+%% ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+%% ok = config:delete("smoosh", "persist", false),
+%% ok = config:delete(config_section(DbName), "min_size", false),
+%% ok = config:delete(config_section(DbName), "min_priority", false),
+%% meck:unload(),
+%% {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+%% smoosh_channel:flush(ChannelPid),
+%% ok.
-channels_tests() ->
- Tests = [
- fun should_enqueue/2
- ],
- {
- "Various channels tests",
- [
- make_test_case("ratio_dbs", Tests)
- ]
- }.
+%% config_section(ChannelType) ->
+%% "smoosh." ++ ChannelType.
-make_test_case(Type, Funs) ->
- {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}.
-
-should_enqueue(ChannelType, DbName) ->
- ?_test(begin
- ok = grow_db_file(DbName, 300),
- ok = wait_enqueue(ChannelType, DbName),
- ?assert(is_enqueued(ChannelType, DbName)),
- ok
- end).
-
-should_persist_queue(ChannelType, DbName) ->
- ?_test(begin
- {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
- ok = grow_db_file(DbName, 300),
- ok = wait_enqueue(ChannelType, DbName),
- ok = smoosh_channel:persist(ChannelPid),
- Q0 = channel_queue(ChannelType),
- ok = application:stop(smoosh),
- ok = application:start(smoosh),
- Q1 = channel_queue(ChannelType),
- % Assert that queues are not empty
- ?assertNotEqual(Q0, smoosh_priority_queue:new(ChannelType)),
- ?assertNotEqual(Q1, smoosh_priority_queue:new(ChannelType)),
- ?assertEqual(Q0, Q1),
- ok
- end).
-
-should_call_recover(_ChannelType, _DbName) ->
- ?_test(begin
- ok = application:stop(smoosh),
- ok = config:set("smoosh", "persist", "true", false),
- meck:new(smoosh_priority_queue, [passthrough]),
- ok = application:start(smoosh),
- timer:sleep(1000),
- ?assertNotEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
- ok
- end).
-
-should_not_call_recover(_ChannelType, _DbName) ->
- ?_test(begin
- ok = application:stop(smoosh),
- ok = config:set("smoosh", "persist", "false", false),
- meck:new(smoosh_priority_queue, [passthrough]),
- ok = application:start(smoosh),
- timer:sleep(1000),
- ?assertEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
- ok
- end).
-
-grow_db_file(DbName, SizeInKb) ->
- {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
- Data = b64url:encode(crypto:strong_rand_bytes(SizeInKb * 1024)),
- Body = {[{<<"value">>, Data}]},
- Doc = #doc{id = <<"doc1">>, body = Body},
- {ok, _} = couch_db:update_doc(Db, Doc, []),
- couch_db:close(Db),
- ok.
+%% %% ==========
+%% %% Tests
+%% %% ----------
-is_enqueued(ChannelType, DbName) ->
- {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
- smoosh_channel:is_key(ChannelPid, DbName).
-
-wait_enqueue(ChannelType, DbName) ->
- test_util:wait(
- fun() ->
- case is_enqueued(ChannelType, DbName) of
- false ->
- wait;
- true ->
- ok
- end
- end,
- 15000
- ).
+%% smoosh_test_() ->
+%% {
+%% "Testing smoosh",
+%% {
+%% setup,
+%% fun() -> test_util:start_couch([smoosh]) end,
+%% fun test_util:stop/1,
+%% [
+%% channels_tests(),
+%% persistence_tests()
+%% ]
+%% }
+%% }.
+
+%% persistence_tests() ->
+%% Tests = [
+%% fun should_persist_queue/2,
+%% fun should_call_recover/2,
+%% fun should_not_call_recover/2
+%% ],
+%% {
+%% "Various persistence tests",
+%% [
+%% make_test_case("ratio_dbs", Tests)
+%% ]
+%% }.
+
+%% channels_tests() ->
+%% Tests = [
+%% fun should_enqueue/2
+%% ],
+%% {
+%% "Various channels tests",
+%% [
+%% make_test_case("ratio_dbs", Tests)
+%% ]
+%% }.
+
+%% make_test_case(Type, Funs) ->
+%% {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}.
+
+%% should_enqueue(ChannelType, DbName) ->
+%% ?_test(begin
+%% ok = grow_db_file(DbName, 300),
+%% ok = wait_enqueue(ChannelType, DbName),
+%% ?assert(is_enqueued(ChannelType, DbName)),
+%% ok
+%% end).
+
+%% should_persist_queue(ChannelType, DbName) ->
+%% ?_test(begin
+%% {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+%% ok = grow_db_file(DbName, 300),
+%% ok = wait_enqueue(ChannelType, DbName),
+%% ok = smoosh_channel:persist(ChannelPid),
+%% Q0 = channel_queue(ChannelType),
+%% ok = application:stop(smoosh),
+%% ok = application:start(smoosh),
+%% Q1 = channel_queue(ChannelType),
+%% % Assert that queues are not empty
+%% ?assertNotEqual(Q0, smoosh_priority_queue:new(ChannelType)),
+%% ?assertNotEqual(Q1, smoosh_priority_queue:new(ChannelType)),
+%% ?assertEqual(Q0, Q1),
+%% ok
+%% end).
+
+%% should_call_recover(_ChannelType, _DbName) ->
+%% ?_test(begin
+%% ok = application:stop(smoosh),
+%% ok = config:set("smoosh", "persist", "true", false),
+%% meck:new(smoosh_priority_queue, [passthrough]),
+%% ok = application:start(smoosh),
+%% timer:sleep(1000),
+%% ?assertNotEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
+%% ok
+%% end).
+
+%% should_not_call_recover(_ChannelType, _DbName) ->
+%% ?_test(begin
+%% ok = application:stop(smoosh),
+%% ok = config:set("smoosh", "persist", "false", false),
+%% meck:new(smoosh_priority_queue, [passthrough]),
+%% ok = application:start(smoosh),
+%% timer:sleep(1000),
+%% ?assertEqual(0, meck:num_calls(smoosh_priority_queue, recover, '_')),
+%% ok
+%% end).
+
+%% grow_db_file(DbName, SizeInKb) ->
+%% {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+%% Data = b64url:encode(crypto:strong_rand_bytes(SizeInKb * 1024)),
+%% Body = {[{<<"value">>, Data}]},
+%% Doc = #doc{id = <<"doc1">>, body = Body},
+%% {ok, _} = couch_db:update_doc(Db, Doc, []),
+%% couch_db:close(Db),
+%% ok.
+
+%% is_enqueued(ChannelType, DbName) ->
+%% {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+%% smoosh_channel:is_key(ChannelPid, DbName).
+
+%% wait_enqueue(ChannelType, DbName) ->
+%% test_util:wait(
+%% fun() ->
+%% case is_enqueued(ChannelType, DbName) of
+%% false ->
+%% wait;
+%% true ->
+%% ok
+%% end
+%% end,
+%% 15000
+%% ).
+
+%% channel_queue(ChannelType) ->
+%% Q0 = smoosh_priority_queue:new(ChannelType),
+%% smoosh_priority_queue:recover(Q0).
+
+create_doc(DbName, DocId, Size) ->
+ Data = b64url:encode(crypto:strong_rand_bytes(Size)),
+ Doc = #doc{id = DocId, body = {[{<<"value">>, Data}]}},
+ fabric:update_doc(DbName, Doc, [?ADMIN_CTX]).
+
+create_ddoc(DbName, DocId, ViewName) ->
+ MapFun = <<"function(doc) { emit(doc.data, null); }">>,
+ DDoc = couch_doc:from_json_obj(
+ {[
+ {<<"_id">>, DocId},
+ {<<"language">>, <<"javascript">>},
+ {<<"autoupdate">>, false},
+ {<<"views">>,
+ {[
+ {ViewName,
+ {[
+ {<<"map">>, MapFun}
+ ]}}
+ ]}}
+ ]}
+ ),
+ fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]).
+
+update_ddoc(DbName, DDName, ViewName) ->
+ {ok, DDoc0} = fabric:open_doc(DbName, DDName, [?ADMIN_CTX]),
+ MapFun = <<"function(doc) { emit(doc.value, 1); }">>,
+ DDoc = DDoc0#doc{
+ body =
+ {[
+ {<<"language">>, <<"javascript">>},
+ {<<"autoupdate">>, false},
+ {<<"views">>,
+ {[
+ {ViewName,
+ {[
+ {<<"map">>, MapFun}
+ ]}}
+ ]}}
+ ]}
+ },
+ fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]).
+
+delete_doc(DbName, DDName) ->
+ {ok, DDoc0} = fabric:open_doc(DbName, DDName, [?ADMIN_CTX]),
+ DDoc = DDoc0#doc{deleted = true, body = {[<<"gone">>, true]}},
+ fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]).
+
+status() ->
+ {ok, Props} = smoosh_server:status(),
+ lists:keysort(1, Props).
+
+flush() ->
+ ok = smoosh_server:flush().
+
+get_channel_pid(Chan) ->
+ [{channel, _, Pid, _}] = ets:lookup(smoosh_server, Chan),
+ Pid.
+
+wait_for_channels(N) when is_integer(N), N >= 0 ->
+ WaitFun = fun() ->
+ case length(status()) of
+ N -> ok;
+ _ -> wait
+ end
+ end,
+ test_util:wait(WaitFun).
+
+wait_to_enqueue(DbName) ->
+ [Shard] = mem3:shards(DbName),
+ ShardName = mem3:name(Shard),
+ meck:wait(smoosh_channel, handle_cast, [{enqueue, ShardName, '_'}, '_'], 4000).
+
+wait_compact_start() ->
+ meck:wait(smoosh_channel, handle_info, [{'_', {ok, '_'}}, '_'], 4000).
+
+wait_compact_done() ->
+ NormalDownPat = {'DOWN', '_', '_', '_', normal},
+ meck:wait(smoosh_channel, handle_info, [NormalDownPat, '_'], 4000).
-channel_queue(ChannelType) ->
- Q0 = smoosh_priority_queue:new(ChannelType),
- smoosh_priority_queue:recover(Q0).
+%% server_state() ->
+%% % matches #state{} record in smoosh_server.erl
+%% {
+%% state,
+%% DbChans,
+%% ViewChans,
+%% CleanupChans,
+%% _EventListner,
+%% _Waiting,
+%% _