You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by bb...@apache.org on 2017/02/27 22:05:41 UTC

[2/3] couch-index commit: updated refs/heads/8409-view-cache to 5510d33

Add expiration functionality to couch_index_server


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/commit/c8fa7776
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/tree/c8fa7776
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/diff/c8fa7776

Branch: refs/heads/8409-view-cache
Commit: c8fa777669a5da4afcc2911144d519ba44a335b0
Parents: 53555fd
Author: Benjamin Bastian <be...@gmail.com>
Authored: Fri Jan 20 17:27:29 2017 -0800
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Tue Jan 24 12:46:50 2017 -0800

----------------------------------------------------------------------
 src/couch_index_server.erl | 159 ++++++++++++++++++++++++++--------------
 1 file changed, 104 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-index/blob/c8fa7776/src/couch_index_server.erl
----------------------------------------------------------------------
diff --git a/src/couch_index_server.erl b/src/couch_index_server.erl
index 4e86f5e..b46c368 100644
--- a/src/couch_index_server.erl
+++ b/src/couch_index_server.erl
@@ -13,10 +13,11 @@
 -module(couch_index_server).
 -behaviour(gen_server).
 -behaviour(config_listener).
+-behaviour(couch_db_monitor).
 
 -vsn(2).
 
--export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]).
+-export([start_link/0, validate/2, get_index/4, get_index/3, get_index_int/3]).
 
 -export([init/1, terminate/2, code_change/3]).
 -export([handle_call/3, handle_cast/2, handle_info/2]).
@@ -28,14 +29,21 @@
     handle_db_event/3
 ]).
 
+% Exported for couch_db_monitor behavior
+-export([close/2, tab_name/1]).
+
 -include_lib("couch/include/couch_db.hrl").
 
 -define(BY_SIG, couchdb_indexes_by_sig).
 -define(BY_PID, couchdb_indexes_by_pid).
 -define(BY_DB, couchdb_indexes_by_db).
 -define(RELISTEN_DELAY, 5000).
+-define(MAX_INDEXES_OPEN, 50).
 
--record(st, {root_dir}).
+-record(st, {
+    root_dir,
+    monitor_state
+}).
 
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -94,21 +102,23 @@ get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) ->
 get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) ->
     {ok, InitState} = Module:init(Db, DDoc),
     {ok, FunResp} = Fun(InitState),
-    {ok, Pid} = get_index(Module, InitState),
+    {ok, Pid} = get_index_int(Module, InitState, couch_db:is_system_db(Db)),
     {ok, Pid, FunResp};
 get_index(Module, Db, DDoc, _Fun) ->
     {ok, InitState} = Module:init(Db, DDoc),
-    get_index(Module, InitState).
+    get_index_int(Module, InitState, couch_db:is_system_db(Db)).
 
 
-get_index(Module, IdxState) ->
+get_index_int(Module, IdxState, SysOwned) ->
     DbName = Module:get(db_name, IdxState),
     Sig = Module:get(signature, IdxState),
-    case ets:lookup(?BY_SIG, {DbName, Sig}) of
-        [{_, Pid}] when is_pid(Pid) ->
+    Args = {Module, IdxState, DbName, Sig, SysOwned},
+    case couch_db_monitor:incref(?MODULE, {DbName, Sig}) of
+        ok ->
+            {ok, {Pid, Monitor, _SysOwned}} = couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}),
+            ok = couch_db_monitor:notify(Monitor),
             {ok, Pid};
         _ ->
-            Args = {Module, IdxState, DbName, Sig},
             gen_server:call(?MODULE, {get_index, Args}, infinity)
     end.
 
@@ -116,60 +126,89 @@ get_index(Module, IdxState) ->
 init([]) ->
     process_flag(trap_exit, true),
     ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()),
-    ets:new(?BY_SIG, [protected, set, named_table]),
-    ets:new(?BY_PID, [private, set, named_table]),
     ets:new(?BY_DB, [protected, bag, named_table]),
     couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
     RootDir = couch_index_util:root_dir(),
     couch_file:init_delete_dir(RootDir),
-    {ok, #st{root_dir=RootDir}}.
+    MaxIndexesOpen = list_to_integer(config:get("couchdb", "max_indexes_open", integer_to_list(?MAX_INDEXES_OPEN))),
+    MonState = couch_db_monitor:new(?MODULE, MaxIndexesOpen),
+    {ok, #st{root_dir=RootDir, monitor_state=MonState}}.
 
 
 terminate(_Reason, _State) ->
-    Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)],
+    Pids = [Pid || {Pid, _} <- ets:tab2list(tab_name(pid))],
     lists:map(fun couch_util:shutdown_sync/1, Pids),
     ok.
 
+make_room(State, false) ->
+    case couch_db_monitor:maybe_close_idle(State#st.monitor_state) of
+        {ok, NewMonState} ->
+            {ok, State#st{monitor_state=NewMonState}};
+        Other ->
+            Other
+    end;
+make_room(State, true) ->
+    {ok, State}.
 
-handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) ->
-    case ets:lookup(?BY_SIG, {DbName, Sig}) of
-        [] ->
-            spawn_link(fun() -> new_index(Args) end),
-            ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
-            {noreply, State};
-        [{_, Waiters}] when is_list(Waiters) ->
-            ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
+handle_call({get_index, {_Mod, _IdxState, DbName, Sig, SysOwned}=Args}, From, State) ->
+    case couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}) of
+        not_found ->
+            case make_room(State, SysOwned) of
+                {ok, NewState} ->
+                    spawn_link(fun() -> new_index(Args) end),
+                    Monitor = couch_db_monitor:spawn_link(?MODULE, {DbName, Sig}, SysOwned),
+                    couch_db_monitor:insert(?MODULE, name, {DbName, Sig}, {[From], Monitor, SysOwned}),
+                    {noreply, NewState};
+                {error, Reason} ->
+                    {reply, {error, Reason}, State}
+            end;
+        {ok, {Waiters, Monitor, SysOwned}} when is_list(Waiters) ->
+            couch_db_monitor:insert(?MODULE, name, {DbName, Sig}, {[From | Waiters], Monitor, SysOwned}),
             {noreply, State};
-        [{_, Pid}] when is_pid(Pid) ->
+        {ok, {Pid, Monitor, _SysOwned}} when is_pid(Pid) ->
+            ok = couch_db_monitor:incref(?MODULE, {DbName, Sig}),
+            ok = couch_db_monitor:notify(Monitor, From),
             {reply, {ok, Pid}, State}
     end;
 handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
-    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
-    [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
+    {ok, {Waiters, Monitor, SysOwned}} = couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}),
     link(Pid),
-    add_to_ets(DbName, Sig, DDocId, Pid),
-    {reply, ok, State};
-handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
-    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    couch_db_monitor:insert(?MODULE, name, {DbName, Sig}, {Pid, Monitor, SysOwned}),
+    couch_db_monitor:insert(?MODULE, pid, Pid, {DbName, Sig}),
+    couch_db_monitor:insert(?MODULE, counters, {DbName, Sig}, 0),
+    ets:insert(?BY_DB, {DbName, {DDocId, Sig}}),
+    lists:foreach(fun(From) ->
+        {Client, _} = From,
+        ok = couch_db_monitor:incref(?MODULE, {DbName, Sig}),
+        ok = couch_db_monitor:notify(Monitor, Client),
+        gen_server:reply(From, {ok, Pid})
+    end, Waiters),
+    {reply, ok, State#st{monitor_state=couch_db_monitor:opened(State#st.monitor_state, SysOwned)}};
+handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {FromPid, _}, State) ->
+    {ok, {Waiters, Monitor, _SO}} = couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}),
     [gen_server:reply(From, Error) || From <- Waiters],
-    ets:delete(?BY_SIG, {DbName, Sig}),
+    couch_db_monitor:delete(?MODULE, {DbName, Sig}, FromPid),
+    ok = couch_db_monitor:close(Monitor),
     {reply, ok, State};
 handle_call({reset_indexes, DbName}, _From, State) ->
-    reset_indexes(DbName, State#st.root_dir),
-    {reply, ok, State}.
+    {reply, ok, reset_indexes(DbName, State)};
+handle_call(open_index_count, _From, State) ->
+    {reply, couch_db_monitor:num_open(State#st.monitor_state), State};
+handle_call(get_server, _From, State) ->
+    {reply, State, State}.
 
 
 handle_cast({reset_indexes, DbName}, State) ->
-    reset_indexes(DbName, State#st.root_dir),
-    {noreply, State}.
+    {noreply, reset_indexes(DbName, State)}.
 
 handle_info({'EXIT', Pid, Reason}, Server) ->
-    case ets:lookup(?BY_PID, Pid) of
-        [{Pid, {DbName, Sig}}] ->
+    case couch_db_monitor:lookup(?MODULE, pid, Pid) of
+        {ok, {DbName, Sig}} ->
             [{DbName, {DDocId, Sig}}] =
                 ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
             rem_from_ets(DbName, Sig, DDocId, Pid);
-        [] when Reason /= normal ->
+        not_found when Reason /= normal ->
+            couch_log:error("Looked up: ~p ~p ~p", [?MODULE, pid, Pid]),
             exit(Reason);
         _Else ->
             ok
@@ -197,8 +236,10 @@ handle_config_change("couchdb", "index_dir", _, _, _) ->
 handle_config_change("couchdb", "view_index_dir", _, _, _) ->
     exit(whereis(couch_index_server), config_change),
     remove_handler;
-handle_config_change(_, _, _, _, RootDir) ->
-    {ok, RootDir}.
+handle_config_change("couchdb", "max_indexes_open", Max, _, _) when is_list(Max) ->
+    {ok, gen_server:call(?MODULE, {set_max_indexes_open, list_to_integer(Max)})};
+handle_config_change(_, _, _, _, _) ->
+    {ok, nil}.
 
 handle_config_terminate(_, stop, _) ->
     ok;
@@ -206,7 +247,18 @@ handle_config_terminate(_Server, _Reason, _State) ->
     erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener),
     {ok, couch_index_util:root_dir()}.
 
-new_index({Mod, IdxState, DbName, Sig}) ->
+% couch_db_monitor behavior implementation
+tab_name(name) -> couch_index;
+tab_name(pid) -> couch_index_by_pid;
+tab_name(counters) -> couch_index_counters;
+tab_name(idle) -> couch_index_idle.
+
+close(DbNameSig, {Pid, _Monitor, SysOwned}) ->
+    couch_log:error("Closing Index: ~p", [DbNameSig]),
+    couch_index:stop(Pid),
+    {true, SysOwned}.
+
+new_index({Mod, IdxState, DbName, Sig, _SysOwned}) ->
     DDocId = Mod:get(idx_name, IdxState),
     case couch_index:start_link({Mod, IdxState}) of
         {ok, Pid} ->
@@ -219,29 +271,26 @@ new_index({Mod, IdxState, DbName, Sig}) ->
     end.
 
 
-reset_indexes(DbName, Root) ->
+reset_indexes(DbName, State) ->
+    #st{root_dir=Root, monitor_state=MonState} = State,
     % shutdown all the updaters and clear the files, the db got changed
-    Fun = fun({_, {DDocId, Sig}}) ->
-        [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    Fun = fun({_, {DDocId, Sig}}, MonStateAcc) ->
+        {ok, {Pid, Monitor, SysOwned}} = couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}),
+        couch_db_monitor:close(Monitor),
         MRef = erlang:monitor(process, Pid),
         gen_server:cast(Pid, delete),
         receive {'DOWN', MRef, _, _, _} -> ok end,
-        rem_from_ets(DbName, Sig, DDocId, Pid)
+        rem_from_ets(DbName, Sig, DDocId, Pid),
+        couch_db_monitor:closed(MonStateAcc, SysOwned)
     end,
-    lists:foreach(Fun, ets:lookup(?BY_DB, DbName)),
+    NewMonState = lists:foldl(Fun, MonState, ets:lookup(?BY_DB, DbName)),
     Path = couch_index_util:index_dir("", DbName),
-    couch_file:nuke_dir(Root, Path).
-
-
-add_to_ets(DbName, Sig, DDocId, Pid) ->
-    ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
-    ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
-    ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
+    couch_file:nuke_dir(Root, Path),
+    State#st{monitor_state=NewMonState}.
 
 
 rem_from_ets(DbName, Sig, DDocId, Pid) ->
-    ets:delete(?BY_SIG, {DbName, Sig}),
-    ets:delete(?BY_PID, Pid),
+    couch_db_monitor:delete(?MODULE, {DbName, Sig}, Pid),
     ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}).
 
 
@@ -253,10 +302,10 @@ handle_db_event(DbName, deleted, St) ->
     {ok, St};
 handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
     lists:foreach(fun({_DbName, {_DDocId, Sig}}) ->
-        case ets:lookup(?BY_SIG, {DbName, Sig}) of
-            [{_, IndexPid}] ->
+        case couch_db_monitor:lookup(?MODULE, name, {DbName, Sig}) of
+            {ok, {IndexPid, _Monitor, _SysOwned}} ->
                 (catch gen_server:cast(IndexPid, ddoc_updated));
-            [] ->
+            not_found ->
                 ok
         end
     end, ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}})),