You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by bb...@apache.org on 2017/02/27 22:05:41 UTC
[2/3] couch-index commit: updated refs/heads/8409-view-cache to
5510d33
Add expiration functionality to couch_index_server
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/commit/c8fa7776
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/tree/c8fa7776
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/diff/c8fa7776
Branch: refs/heads/8409-view-cache
Commit: c8fa777669a5da4afcc2911144d519ba44a335b0
Parents: 53555fd
Author: Benjamin Bastian <be...@gmail.com>
Authored: Fri Jan 20 17:27:29 2017 -0800
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Tue Jan 24 12:46:50 2017 -0800
----------------------------------------------------------------------
src/couch_index_server.erl | 159 ++++++++++++++++++++++++++--------------
1 file changed, 104 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/blob/c8fa7776/src/couch_index_server.erl
----------------------------------------------------------------------
diff --git a/src/couch_index_server.erl b/src/couch_index_server.erl
index 4e86f5e..b46c368 100644
--- a/src/couch_index_server.erl
+++ b/src/couch_index_server.erl
@@ -13,10 +13,11 @@
-module(couch_index_server).
-behaviour(gen_server).
-behaviour(config_listener).
+-behaviour(couch_db_monitor).
-vsn(2).
--export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]).
+-export([start_link/0, validate/2, get_index/4, get_index/3, get_index_int/3]).
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
@@ -28,14 +29,21 @@
handle_db_event/3
]).
+% Exported for couch_db_monitor behavior
+-export([close/2, tab_name/1]).
+
-include_lib("couch/include/couch_db.hrl").
-define(BY_SIG, couchdb_indexes_by_sig).
-define(BY_PID, couchdb_indexes_by_pid).
-define(BY_DB, couchdb_indexes_by_db).
-define(RELISTEN_DELAY, 5000).
+-define(MAX_INDEXES_OPEN, 50).
--record(st, {root_dir}).
+-record(st, {
+ root_dir,
+ monitor_state
+}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -94,21 +102,23 @@ get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) ->
get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) ->
{ok, InitState} = Module:init(Db, DDoc),
{ok, FunResp} = Fun(InitState),
- {ok, Pid} = get_index(Module, InitState),
+ {ok, Pid} = get_index_int(Module, InitState, couch_db:is_system_db(Db)),
{ok, Pid, FunResp};
get_index(Module, Db, DDoc, _Fun) ->
{ok, InitState} = Module:init(Db, DDoc),
- get_index(Module, InitState).
+ get_index_int(Module, InitState, couch_db:is_system_db(Db)).
-get_index(Module, IdxState) ->
+get_index_int(Module, IdxState, SysOwned) ->
DbName = Module:get(db_name, IdxState),
Sig = Module:get(signature, IdxState),
- case ets:lookup(?BY_SIG, {DbName, Sig}) of
- [{_, Pid}] when is_pid(Pid) ->
+ Args = {Module, IdxState, DbName, Sig, SysOwned},
+ case couch_db_monitor:incref(?MODULE, {DbName, Sig}) of
+ ok ->
+ {ok, {Pid, Monitor, _SysOwned}} = couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}),
+ ok = couch_db_monitor:notify(Monitor),
{ok, Pid};
_ ->
- Args = {Module, IdxState, DbName, Sig},
gen_server:call(?MODULE, {get_index, Args}, infinity)
end.
@@ -116,60 +126,89 @@ get_index(Module, IdxState) ->
init([]) ->
process_flag(trap_exit, true),
ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()),
- ets:new(?BY_SIG, [protected, set, named_table]),
- ets:new(?BY_PID, [private, set, named_table]),
ets:new(?BY_DB, [protected, bag, named_table]),
couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
RootDir = couch_index_util:root_dir(),
couch_file:init_delete_dir(RootDir),
- {ok, #st{root_dir=RootDir}}.
+ MaxIndexesOpen = list_to_integer(config:get("couchdb", "max_indexes_open", integer_to_list(?MAX_INDEXES_OPEN))),
+ MonState = couch_db_monitor:new(?MODULE, MaxIndexesOpen),
+ {ok, #st{root_dir=RootDir, monitor_state=MonState}}.
terminate(_Reason, _State) ->
- Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)],
+ Pids = [Pid || {Pid, _} <- ets:tab2list(tab_name(pid))],
lists:map(fun couch_util:shutdown_sync/1, Pids),
ok.
+make_room(State, false) ->
+ case couch_db_monitor:maybe_close_idle(State#st.monitor_state) of
+ {ok, NewMonState} ->
+ {ok, State#st{monitor_state=NewMonState}};
+ Other ->
+ Other
+ end;
+make_room(State, true) ->
+ {ok, State}.
-handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) ->
- case ets:lookup(?BY_SIG, {DbName, Sig}) of
- [] ->
- spawn_link(fun() -> new_index(Args) end),
- ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
- {noreply, State};
- [{_, Waiters}] when is_list(Waiters) ->
- ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
+handle_call({get_index, {_Mod, _IdxState, DbName, Sig, SysOwned}=Args}, From, State) ->
+ case couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}) of
+ not_found ->
+ case make_room(State, SysOwned) of
+ {ok, NewState} ->
+ spawn_link(fun() -> new_index(Args) end),
+ Monitor = couch_db_monitor:spawn_link(?MODULE, {DbName, Sig}, SysOwned),
+ couch_db_monitor:insert(?MODULE, name, {DbName, Sig}, {[From], Monitor, SysOwned}),
+ {noreply, NewState};
+ {error, Reason} ->
+ {reply, {error, Reason}, State}
+ end;
+ {ok, {Waiters, Monitor, SysOwned}} when is_list(Waiters) ->
+ couch_db_monitor:insert(?MODULE, name, {DbName, Sig}, {[From | Waiters], Monitor, SysOwned}),
{noreply, State};
- [{_, Pid}] when is_pid(Pid) ->
+ {ok, {Pid, Monitor, _SysOwned}} when is_pid(Pid) ->
+ ok = couch_db_monitor:incref(?MODULE, {DbName, Sig}),
+ ok = couch_db_monitor:notify(Monitor, From),
{reply, {ok, Pid}, State}
end;
handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
- [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
- [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
+ {ok, {Waiters, Monitor, SysOwned}} = couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}),
link(Pid),
- add_to_ets(DbName, Sig, DDocId, Pid),
- {reply, ok, State};
-handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
- [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+ couch_db_monitor:insert(?MODULE, name, {DbName, Sig}, {Pid, Monitor, SysOwned}),
+ couch_db_monitor:insert(?MODULE, pid, Pid, {DbName, Sig}),
+ couch_db_monitor:insert(?MODULE, counters, {DbName, Sig}, 0),
+ ets:insert(?BY_DB, {DbName, {DDocId, Sig}}),
+ lists:foreach(fun(From) ->
+ {Client, _} = From,
+ ok = couch_db_monitor:incref(?MODULE, {DbName, Sig}),
+ ok = couch_db_monitor:notify(Monitor, Client),
+ gen_server:reply(From, {ok, Pid})
+ end, Waiters),
+ {reply, ok, State#st{monitor_state=couch_db_monitor:opened(State#st.monitor_state, SysOwned)}};
+handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {FromPid, _}, State) ->
+ {ok, {Waiters, Monitor, _SO}} = couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}),
[gen_server:reply(From, Error) || From <- Waiters],
- ets:delete(?BY_SIG, {DbName, Sig}),
+ couch_db_monitor:delete(?MODULE, {DbName, Sig}, FromPid),
+ ok = couch_db_monitor:close(Monitor),
{reply, ok, State};
handle_call({reset_indexes, DbName}, _From, State) ->
- reset_indexes(DbName, State#st.root_dir),
- {reply, ok, State}.
+ {reply, ok, reset_indexes(DbName, State)};
+handle_call(open_index_count, _From, State) ->
+ {reply, couch_db_monitor:num_open(State#st.monitor_state), State};
+handle_call(get_server, _From, State) ->
+ {reply, State, State}.
handle_cast({reset_indexes, DbName}, State) ->
- reset_indexes(DbName, State#st.root_dir),
- {noreply, State}.
+ {noreply, reset_indexes(DbName, State)}.
handle_info({'EXIT', Pid, Reason}, Server) ->
- case ets:lookup(?BY_PID, Pid) of
- [{Pid, {DbName, Sig}}] ->
+ case couch_db_monitor:lookup(?MODULE, pid, Pid) of
+ {ok, {DbName, Sig}} ->
[{DbName, {DDocId, Sig}}] =
ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
rem_from_ets(DbName, Sig, DDocId, Pid);
- [] when Reason /= normal ->
+ not_found when Reason /= normal ->
+ couch_log:error("Looked up: ~p ~p ~p", [?MODULE, pid, Pid]),
exit(Reason);
_Else ->
ok
@@ -197,8 +236,10 @@ handle_config_change("couchdb", "index_dir", _, _, _) ->
handle_config_change("couchdb", "view_index_dir", _, _, _) ->
exit(whereis(couch_index_server), config_change),
remove_handler;
-handle_config_change(_, _, _, _, RootDir) ->
- {ok, RootDir}.
+handle_config_change("couchdb", "max_indexes_open", Max, _, _) when is_list(Max) ->
+ {ok, gen_server:call(?MODULE, {set_max_indexes_open, list_to_integer(Max)})};
+handle_config_change(_, _, _, _, _) ->
+ {ok, nil}.
handle_config_terminate(_, stop, _) ->
ok;
@@ -206,7 +247,18 @@ handle_config_terminate(_Server, _Reason, _State) ->
erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener),
{ok, couch_index_util:root_dir()}.
-new_index({Mod, IdxState, DbName, Sig}) ->
+% couch_db_monitor behavior implementation
+tab_name(name) -> couch_index;
+tab_name(pid) -> couch_index_by_pid;
+tab_name(counters) -> couch_index_counters;
+tab_name(idle) -> couch_index_idle.
+
+close(DbNameSig, {Pid, _Monitor, SysOwned}) ->
+ couch_log:error("Closing Index: ~p", [DbNameSig]),
+ couch_index:stop(Pid),
+ {true, SysOwned}.
+
+new_index({Mod, IdxState, DbName, Sig, _SysOwned}) ->
DDocId = Mod:get(idx_name, IdxState),
case couch_index:start_link({Mod, IdxState}) of
{ok, Pid} ->
@@ -219,29 +271,26 @@ new_index({Mod, IdxState, DbName, Sig}) ->
end.
-reset_indexes(DbName, Root) ->
+reset_indexes(DbName, State) ->
+ #st{root_dir=Root, monitor_state=MonState} = State,
% shutdown all the updaters and clear the files, the db got changed
- Fun = fun({_, {DDocId, Sig}}) ->
- [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+ Fun = fun({_, {DDocId, Sig}}, MonStateAcc) ->
+ {ok, {Pid, Monitor, SysOwned}} = couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}),
+ couch_db_monitor:close(Monitor),
MRef = erlang:monitor(process, Pid),
gen_server:cast(Pid, delete),
receive {'DOWN', MRef, _, _, _} -> ok end,
- rem_from_ets(DbName, Sig, DDocId, Pid)
+ rem_from_ets(DbName, Sig, DDocId, Pid),
+ couch_db_monitor:closed(MonStateAcc, SysOwned)
end,
- lists:foreach(Fun, ets:lookup(?BY_DB, DbName)),
+ NewMonState = lists:foldl(Fun, MonState, ets:lookup(?BY_DB, DbName)),
Path = couch_index_util:index_dir("", DbName),
- couch_file:nuke_dir(Root, Path).
-
-
-add_to_ets(DbName, Sig, DDocId, Pid) ->
- ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
- ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
- ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
+ couch_file:nuke_dir(Root, Path),
+ State#st{monitor_state=NewMonState}.
rem_from_ets(DbName, Sig, DDocId, Pid) ->
- ets:delete(?BY_SIG, {DbName, Sig}),
- ets:delete(?BY_PID, Pid),
+ couch_db_monitor:delete(?MODULE, {DbName, Sig}, Pid),
ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}).
@@ -253,10 +302,10 @@ handle_db_event(DbName, deleted, St) ->
{ok, St};
handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
lists:foreach(fun({_DbName, {_DDocId, Sig}}) ->
- case ets:lookup(?BY_SIG, {DbName, Sig}) of
- [{_, IndexPid}] ->
+ case couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}) of
+ {ok, {IndexPid, _Monitor, _SysOwned}} ->
(catch gen_server:cast(IndexPid, ddoc_updated));
- [] ->
+ not_found ->
ok
end
end, ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}})),