You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by bb...@apache.org on 2017/04/14 07:14:51 UTC
[couchdb] 03/04: Use LRU for view indexes
This is an automated email from the ASF dual-hosted git repository.
bbastian pushed a commit to branch 8409-view-lru
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 4dfd8fbaf6f10ffdf7fcef10e3d6075e5e9440b8
Author: Benjamin Bastian <be...@gmail.com>
AuthorDate: Wed Mar 29 14:44:23 2017 -0700
Use LRU for view indexes
---
src/couch_index/priv/stats_descriptions.cfg | 20 ++
src/couch_index/src/couch_index.erl | 16 +-
src/couch_index/src/couch_index_compactor.erl | 3 +
src/couch_index/src/couch_index_server.erl | 180 ++++++++++++----
.../test/couch_index_compaction_tests.erl | 7 +-
src/couch_index/test/couch_index_lru_tests.erl | 226 +++++++++++++++++++++
6 files changed, 408 insertions(+), 44 deletions(-)
diff --git a/src/couch_index/priv/stats_descriptions.cfg b/src/couch_index/priv/stats_descriptions.cfg
new file mode 100644
index 0000000..aac317f
--- /dev/null
+++ b/src/couch_index/priv/stats_descriptions.cfg
@@ -0,0 +1,20 @@
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
+
+% Style guide for descriptions: Start with a lowercase letter & do not add
+% a trailing full-stop / period
+% Please keep this in alphabetical order
+
+{[couchdb, couch_index_server, lru_skip], [
+ {type, counter},
+ {desc, <<"number of couch_index_server LRU operations skipped">>}
+]}.
diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl
index b339010..1f4fbf4 100644
--- a/src/couch_index/src/couch_index.erl
+++ b/src/couch_index/src/couch_index.erl
@@ -230,8 +230,11 @@ handle_cast({new_state, NewIdxState}, State) ->
couch_log:debug("Updated index for db: ~s idx: ~s seq: ~B", Args),
Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState),
case State#st.committed of
- true -> erlang:send_after(commit_delay(), self(), commit);
- false -> ok
+ true ->
+ ok = couch_index_server:set_committing(self(), true),
+ erlang:send_after(commit_delay(), self(), commit);
+ false ->
+ ok
end,
{noreply, State#st{
idx_state=NewIdxState,
@@ -287,6 +290,7 @@ handle_info(commit, State) ->
% Commit the updates
ok = Mod:commit(IdxState),
couch_event:notify(DbName, {index_commit, IdxName}),
+ ok = couch_index_server:set_committing(self(), false),
{noreply, State#st{committed=true}};
_ ->
% We can't commit the header because the database seq that's
@@ -295,6 +299,7 @@ handle_info(commit, State) ->
% forever out of sync with the database. But a crash before we
% commit these changes, no big deal, we only lose incremental
% changes since last committal.
+ ok = couch_index_server:set_committing(self(), true),
erlang:send_after(commit_delay(), self(), commit),
{noreply, State}
end;
@@ -375,8 +380,11 @@ commit_compacted(NewIdxState, State) ->
false -> ok
end,
case State#st.committed of
- true -> erlang:send_after(commit_delay(), self(), commit);
- false -> ok
+ true ->
+ ok = couch_index_server:set_committing(self(), true),
+ erlang:send_after(commit_delay(), self(), commit);
+ false ->
+ ok
end,
State#st{
idx_state=NewIdxState1,
diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl
index b5db058..4f71ccf 100644
--- a/src/couch_index/src/couch_index_compactor.erl
+++ b/src/couch_index/src/couch_index_compactor.erl
@@ -62,12 +62,14 @@ handle_call({compact, _}, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
{reply, {ok, Pid}, State};
handle_call({compact, IdxState}, _From, #st{idx=Idx}=State) ->
Pid = spawn_link(fun() -> compact(Idx, State#st.mod, IdxState) end),
+ ok = couch_index_server:set_compacting(Idx, true),
{reply, {ok, Pid}, State#st{pid=Pid}};
handle_call(cancel, _From, #st{pid=undefined}=State) ->
{reply, ok, State};
handle_call(cancel, _From, #st{pid=Pid}=State) ->
unlink(Pid),
exit(Pid, kill),
+ ok = couch_index_server:set_compacting(State#st.idx, false),
{reply, ok, State#st{pid=undefined}};
handle_call(is_running, _From, #st{pid=Pid}=State) when is_pid(Pid) ->
{reply, true, State};
@@ -80,6 +82,7 @@ handle_cast(_Mesg, State) ->
handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) ->
+ ok = couch_index_server:set_compacting(State#st.idx, false),
{noreply, State#st{pid=undefined}};
handle_info({'EXIT', _Pid, normal}, State) ->
{noreply, State};
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index 4e86f5e..d4f35cb 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -16,7 +16,8 @@
-vsn(2).
--export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]).
+-export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2, close/1]).
+-export([set_committing/2, set_compacting/2]).
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
@@ -34,8 +35,76 @@
-define(BY_PID, couchdb_indexes_by_pid).
-define(BY_DB, couchdb_indexes_by_db).
-define(RELISTEN_DELAY, 5000).
+-define(MAX_INDICES_OPEN, 500).
+
+-record(st, {
+ lru=couch_lru:new(fun maybe_close_index/1),
+ open=0,
+ max_open=?MAX_INDICES_OPEN,
+ root_dir
+}).
+
+-record(entry, {
+ name,
+ pid,
+ locked=false,
+ committing=false,
+ compacting=false,
+ waiters=undefined
+}).
+
+close(Mon) ->
+ erlang:demonitor(Mon, [flush]),
+ ok.
+
+maybe_close_lru_view(#st{open=Open, max_open=Max}=State) when Open =< Max ->
+ {ok, State};
+maybe_close_lru_view(State) ->
+ #st{lru=Lru, open=Open} = State,
+ case couch_lru:close(Lru) of
+ false ->
+ {ok, State};
+ {true, NewLru} ->
+ maybe_close_lru_view(State#st{lru=NewLru, open=Open-1})
+ end.
+
+is_idle(Pid) ->
+ case erlang:process_info(Pid, monitored_by) of
+ undefined ->
+ true;
+ {monitored_by, Pids} ->
+ [] =:= Pids -- [whereis(couch_stats_process_tracker)]
+ end.
--record(st, {root_dir}).
+set_compacting(Idx, IsCompacting) ->
+ gen_server:call(?MODULE, {compacting, Idx, IsCompacting}, infinity).
+
+set_committing(Pid, IsCommitting) ->
+ gen_server:call(?MODULE, {committing, Pid, IsCommitting}, infinity).
+
+maybe_close_index({DbName, DDocId, Sig}) ->
+ case ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, true}) of
+ true ->
+ case ets:lookup(?BY_SIG, {DbName, Sig}) of
+ [#entry{pid=Pid, committing=false, compacting=false}] ->
+ case is_idle(Pid) of
+ true ->
+ rem_from_ets(DbName, Sig, DDocId, Pid),
+ couch_index:stop(Pid),
+ {true, true};
+ false ->
+ ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, false}),
+ couch_stats:increment_counter([couchdb, couch_index_server, lru_skip]),
+ {false, false}
+ end;
+ _ ->
+ ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.locked, false}),
+ couch_stats:increment_counter([couchdb, couch_index_server, lru_skip]),
+ {false, false}
+ end;
+ false ->
+ {false, true}
+ end.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -94,8 +163,8 @@ get_index(Module, Db, DDoc, Fun) when is_binary(DDoc) ->
get_index(Module, Db, DDoc, Fun) when is_function(Fun, 1) ->
{ok, InitState} = Module:init(Db, DDoc),
{ok, FunResp} = Fun(InitState),
- {ok, Pid} = get_index(Module, InitState),
- {ok, Pid, FunResp};
+ {ok, Pid, Monitor} = get_index(Module, InitState),
+ {ok, Pid, Monitor, FunResp};
get_index(Module, Db, DDoc, _Fun) ->
{ok, InitState} = Module:init(Db, DDoc),
get_index(Module, InitState).
@@ -105,24 +174,31 @@ get_index(Module, IdxState) ->
DbName = Module:get(db_name, IdxState),
Sig = Module:get(signature, IdxState),
case ets:lookup(?BY_SIG, {DbName, Sig}) of
- [{_, Pid}] when is_pid(Pid) ->
- {ok, Pid};
+ [#entry{pid=Pid, locked=false}] when is_pid(Pid) ->
+ Monitor = erlang:monitor(process, Pid),
+ {ok, Pid, Monitor};
_ ->
Args = {Module, IdxState, DbName, Sig},
- gen_server:call(?MODULE, {get_index, Args}, infinity)
+ case gen_server:call(?MODULE, {get_index, Args}, infinity) of
+ {ok, Pid} ->
+ Monitor = erlang:monitor(process, Pid),
+ {ok, Pid, Monitor};
+ {error, Reason} ->
+ {error, Reason}
+ end
end.
-
init([]) ->
process_flag(trap_exit, true),
ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()),
- ets:new(?BY_SIG, [protected, set, named_table]),
- ets:new(?BY_PID, [private, set, named_table]),
+ ets:new(?BY_SIG, [protected, set, named_table, {keypos, #entry.name}]),
+ ets:new(?BY_PID, [protected, set, named_table]),
ets:new(?BY_DB, [protected, bag, named_table]),
couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
RootDir = couch_index_util:root_dir(),
+ MaxIndicesOpen = config:get_integer("couchdb", "max_indices_open", ?MAX_INDICES_OPEN),
couch_file:init_delete_dir(RootDir),
- {ok, #st{root_dir=RootDir}}.
+ {ok, #st{root_dir=RootDir, max_open=MaxIndicesOpen}}.
terminate(_Reason, _State) ->
@@ -134,47 +210,69 @@ terminate(_Reason, _State) ->
handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) ->
case ets:lookup(?BY_SIG, {DbName, Sig}) of
[] ->
+ {ok, NewState} = maybe_close_lru_view(State#st{open=(State#st.open)+1}),
spawn_link(fun() -> new_index(Args) end),
- ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
+ ets:insert(?BY_SIG, #entry{name={DbName, Sig}, waiters=[From]}),
+ {noreply, NewState};
+ [#entry{waiters=Waiters}=Entry] when is_list(Waiters) ->
+ ets:insert(?BY_SIG, Entry#entry{waiters=[From | Waiters]}),
{noreply, State};
- [{_, Waiters}] when is_list(Waiters) ->
- ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
- {noreply, State};
- [{_, Pid}] when is_pid(Pid) ->
+ [#entry{pid=Pid}] when is_pid(Pid) ->
{reply, {ok, Pid}, State}
end;
handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
- [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+ [#entry{waiters=Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+ NewLru = couch_lru:insert({DbName, DDocId, Sig}, State#st.lru),
[gen_server:reply(From, {ok, Pid}) || From <- Waiters],
link(Pid),
add_to_ets(DbName, Sig, DDocId, Pid),
- {reply, ok, State};
+ {reply, ok, State#st{lru=NewLru}};
handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
- [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+ [#entry{waiters=Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
[gen_server:reply(From, Error) || From <- Waiters],
ets:delete(?BY_SIG, {DbName, Sig}),
+ {reply, ok, State#st{open=(State#st.open)-1}};
+handle_call({compacting, Pid, IsCompacting}, _From, State) ->
+ case ets:lookup(?BY_PID, Pid) of
+ [{Pid, {DbName, Sig}}] ->
+ ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.compacting, IsCompacting});
+ [] ->
+ ok
+ end,
+ {reply, ok, State};
+handle_call({committing, Pid, IsCommitting}, _From, State) ->
+ case ets:lookup(?BY_PID, Pid) of
+ [{Pid, {DbName, Sig}}] ->
+ ets:update_element(?BY_SIG, {DbName, Sig}, {#entry.committing, IsCommitting});
+ [] ->
+ ok
+ end,
{reply, ok, State};
handle_call({reset_indexes, DbName}, _From, State) ->
reset_indexes(DbName, State#st.root_dir),
- {reply, ok, State}.
+ {reply, ok, State};
+handle_call(get_open_count, _From, State) ->
+ {reply, State#st.open, State}.
handle_cast({reset_indexes, DbName}, State) ->
reset_indexes(DbName, State#st.root_dir),
- {noreply, State}.
+ {noreply, State};
+handle_cast(close_indexes, State) ->
+ {ok, NewState} = maybe_close_lru_view(State),
+ {noreply, NewState}.
-handle_info({'EXIT', Pid, Reason}, Server) ->
- case ets:lookup(?BY_PID, Pid) of
+handle_info({'EXIT', Pid, Reason}, State) ->
+ NewState = case ets:lookup(?BY_PID, Pid) of
[{Pid, {DbName, Sig}}] ->
- [{DbName, {DDocId, Sig}}] =
- ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
- rem_from_ets(DbName, Sig, DDocId, Pid);
+ rem_from_ets(DbName, Sig, Pid),
+ State#st{open=(State#st.open)-1};
[] when Reason /= normal ->
exit(Reason);
_Else ->
- ok
+ State
end,
- {noreply, Server};
+ {noreply, NewState};
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()),
{noreply, State};
@@ -187,18 +285,20 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-handle_config_change("couchdb", "index_dir", RootDir, _, RootDir) ->
- {ok, RootDir};
-handle_config_change("couchdb", "view_index_dir", RootDir, _, RootDir) ->
- {ok, RootDir};
+handle_config_change("couchdb", "index_dir", RootDir, _, State) ->
+ {ok, State#st{root_dir=RootDir}};
+handle_config_change("couchdb", "view_index_dir", RootDir, _, State) ->
+ {ok, State#st{root_dir=RootDir}};
handle_config_change("couchdb", "index_dir", _, _, _) ->
exit(whereis(couch_index_server), config_change),
remove_handler;
handle_config_change("couchdb", "view_index_dir", _, _, _) ->
exit(whereis(couch_index_server), config_change),
remove_handler;
-handle_config_change(_, _, _, _, RootDir) ->
- {ok, RootDir}.
+handle_config_change("couchdb", "max_indices_open", Max, _, State) ->
+ {ok, State#st{max_open=list_to_integer(Max)}};
+handle_config_change(_, _, _, _, State) ->
+ {ok, State}.
handle_config_terminate(_, stop, _) ->
ok;
@@ -222,7 +322,7 @@ new_index({Mod, IdxState, DbName, Sig}) ->
reset_indexes(DbName, Root) ->
% shutdown all the updaters and clear the files, the db got changed
Fun = fun({_, {DDocId, Sig}}) ->
- [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+ [#entry{pid=Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
MRef = erlang:monitor(process, Pid),
gen_server:cast(Pid, delete),
receive {'DOWN', MRef, _, _, _} -> ok end,
@@ -234,11 +334,17 @@ reset_indexes(DbName, Root) ->
add_to_ets(DbName, Sig, DDocId, Pid) ->
- ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
+ ets:insert(?BY_SIG, #entry{name={DbName, Sig}, pid=Pid}),
ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
+rem_from_ets(DbName, Sig, Pid) ->
+ [{DbName, {DDocId, Sig}}] =
+ ets:match_object(?BY_DB, {DbName, {'$1', Sig}}),
+ rem_from_ets(DbName, Sig, DDocId, Pid).
+
+
rem_from_ets(DbName, Sig, DDocId, Pid) ->
ets:delete(?BY_SIG, {DbName, Sig}),
ets:delete(?BY_PID, Pid),
@@ -254,7 +360,7 @@ handle_db_event(DbName, deleted, St) ->
handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
lists:foreach(fun({_DbName, {_DDocId, Sig}}) ->
case ets:lookup(?BY_SIG, {DbName, Sig}) of
- [{_, IndexPid}] ->
+ [#entry{pid=IndexPid}] ->
(catch gen_server:cast(IndexPid, ddoc_updated));
[] ->
ok
diff --git a/src/couch_index/test/couch_index_compaction_tests.erl b/src/couch_index/test/couch_index_compaction_tests.erl
index 0787151..6cbffc8 100644
--- a/src/couch_index/test/couch_index_compaction_tests.erl
+++ b/src/couch_index/test/couch_index_compaction_tests.erl
@@ -19,9 +19,9 @@ setup() ->
DbName = ?tempdb(),
{ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
couch_db:close(Db),
- {ok, IndexerPid} = fake_index(Db),
+ {ok, IndexerPid, Mon} = fake_index(Db),
?assertNot(is_opened(Db)),
- {Db, IndexerPid}.
+ {Db, IndexerPid, Mon}.
fake_index(#db{name = DbName} = Db) ->
ok = meck:new([test_index], [non_strict]),
@@ -67,7 +67,7 @@ compaction_test_() ->
}.
-hold_db_for_recompaction({Db, Idx}) ->
+hold_db_for_recompaction({Db, Idx, Mon}) ->
?_test(begin
?assertNot(is_opened(Db)),
ok = meck:reset(test_index),
@@ -87,6 +87,7 @@ hold_db_for_recompaction({Db, Idx}) ->
end,
?assertNot(is_opened(Db)),
+ couch_index_server:close(Mon),
ok
end).
diff --git a/src/couch_index/test/couch_index_lru_tests.erl b/src/couch_index/test/couch_index_lru_tests.erl
new file mode 100644
index 0000000..52f4244
--- /dev/null
+++ b/src/couch_index/test/couch_index_lru_tests.erl
@@ -0,0 +1,226 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_lru_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(MAX_INDICES_OPEN, 10).
+
+-record(test_idx, {
+ db_name,
+ idx_name,
+ signature
+}).
+
+
+setup() ->
+ test_util:start_couch([]),
+ DbName = ?tempdb(),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+ config:set("couchdb", "max_indices_open", integer_to_list(?MAX_INDICES_OPEN)),
+ Db.
+
+
+teardown(Db) ->
+ ok = couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ config:delete("couchdb", "max_indices_open"),
+ (catch couch_db:close(Db)),
+ ok.
+
+
+lru_test_() ->
+ {
+ "Test the view index LRU",
+ {
+ setup,
+ fun() -> test_util:start_couch([]) end, fun test_util:stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun test_close_while_compacting/1,
+ fun test_soft_max/1
+ ]
+ }
+ }
+ }.
+
+
+test_close_while_compacting(Db) ->
+ ?_test(begin
+ ok = meck:new([couch_index_server], [passthrough]),
+ Self = self(),
+ ok = meck:expect(couch_index_server, set_compacting, fun(Idx, IsCompacting) ->
+ meck:passthrough([Idx, IsCompacting]),
+ Self ! {compact, IsCompacting, self()},
+ receive finish ->
+ ok
+ end,
+ ok
+ end),
+
+ ok = meck:expect(couch_index_server, set_committing, fun(Idx, IsCommitting) ->
+ meck:passthrough([Idx, IsCommitting]),
+ Self ! {commit, IsCommitting, self()},
+ ok
+ end),
+
+ % create ddocs
+ DDocIds = lists:map(fun(I) ->
+ BI = integer_to_binary(I),
+ <<"_design/ddoc_", BI/binary>>
+ end, lists:seq(1,?MAX_INDICES_OPEN+10)),
+ ok = create_ddocs(Db, DDocIds),
+
+ % open and compact indexes
+ Openers = lists:map(fun(DDocId) ->
+ spawn_link(fun() ->
+ {ok, Pid, Mon} = couch_index_server:get_index(couch_mrview_index, Db#db.name, DDocId),
+ couch_index:compact(Pid),
+ receive close ->
+ ok
+ end,
+ couch_index_server:close(Mon)
+ end)
+ end, DDocIds),
+
+ % check that all indexes are open
+ ToClose = wait_all_compacting(true, [], ?MAX_INDICES_OPEN+10),
+ ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)),
+ % close compactor pids, but still block flag from being set in BY_SIG table
+ lists:foreach(fun(Opener) -> Opener ! close end, Openers),
+ % check that compaction flag block pids from closing
+ gen_server:cast(couch_index_server, close_indexes),
+ ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)),
+ % allow compaction flag to be unset
+ lists:foreach(fun(CPid) -> CPid ! finish end, ToClose),
+ % wait until all compaction flags are unset
+ Finished = wait_all_compacting(false, [], ?MAX_INDICES_OPEN+10),
+ lists:foreach(fun(CPid) -> CPid ! finish end, Finished),
+ gen_server:cast(couch_index_server, close_indexes),
+ ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)),
+ % wait for all commits to start
+ Indexers = wait_all_committing(dict:new(), true, ?MAX_INDICES_OPEN+10),
+ ?assertEqual(?MAX_INDICES_OPEN+10, gen_server:call(couch_index_server, get_open_count)),
+ % force premature commit
+ [Indexer ! commit || Indexer <- Indexers],
+ % wait until commits happen
+ wait_all_committing(dict:new(), false, ?MAX_INDICES_OPEN+10),
+ gen_server:cast(couch_index_server, close_indexes),
+ % since all commits and all compacts are done, make sure indexes are closed
+ ?assertEqual(?MAX_INDICES_OPEN, gen_server:call(couch_index_server, get_open_count)),
+ % clean up
+ (catch meck:unload(couch_index_server)),
+ ok
+ end).
+
+
+test_soft_max(Db) ->
+ ?_test(begin
+ ok = meck:new([test_index], [non_strict]),
+ ok = meck:expect(test_index, init, fun(Db0, DDoc) ->
+ Sig = couch_crypto:hash(md5, term_to_binary({Db0#db.name, DDoc})),
+ {ok, #test_idx{db_name=Db0#db.name, idx_name=DDoc, signature=Sig}}
+ end),
+ ok = meck:expect(test_index, close, ['_'], {true, true}),
+ ok = meck:expect(test_index, open, fun(_Db, State) ->
+ {ok, State}
+ end),
+ ok = meck:expect(test_index, compact, ['_', '_', '_'],
+ meck:seq([{ok, 9}, {ok, 10}])), %% to trigger recompaction
+ ok = meck:expect(test_index, commit, ['_'], ok),
+ ok = meck:expect(test_index, get, fun
+ (db_name, State) ->
+ State#test_idx.db_name;
+ (idx_name, State) ->
+ State#test_idx.idx_name;
+ (signature, State) ->
+ State#test_idx.signature;
+ (update_seq, Seq) ->
+ Seq
+ end),
+
+ ok = meck:reset(test_index),
+
+ IdxOpens = lists:map(fun(I) ->
+ BI = integer_to_binary(I),
+ % hack: use tuple as index name so couch_index_server won't try to open
+ % it as a design document.
+ IndexName = {<<"_design/i", BI/binary>>},
+ ?assertEqual(I-1, gen_server:call(couch_index_server, get_open_count)),
+ couch_index_server:get_index(test_index, Db, IndexName)
+ end, lists:seq(1, 500)),
+
+ lists:foldl(fun(IdxOpen, Acc) ->
+ ?assertMatch({ok, _, _}, IdxOpen),
+ {ok, Pid, Mon} = IdxOpen,
+ ?assert(is_pid(Pid)),
+ ?assert(is_reference(Mon)),
+ ?assertNotEqual(undefined, process_info(Pid)),
+ gen_server:cast(couch_index_server, close_indexes),
+ OpenCount = gen_server:call(couch_index_server, get_open_count),
+ ?assertEqual(max(?MAX_INDICES_OPEN, Acc), OpenCount),
+ couch_index_server:close(Mon),
+ Acc-1
+ end, 500, IdxOpens),
+
+ config:delete("couchdb", "max_indices_open"),
+ (catch meck:unload(test_index)),
+ (catch meck:unload(couch_util)),
+ ok
+ end).
+
+
+wait_all_compacting(_IsCompacting, Acc, 0) ->
+ Acc;
+wait_all_compacting(IsCompacting, Acc, Remaining) ->
+ receive {compact, IsCompacting, From} ->
+ wait_all_compacting(IsCompacting, [From | Acc], Remaining-1)
+ end.
+
+
+wait_all_committing(Pids, ShouldBe, Count) ->
+ receive {commit, IsCommitting, From} ->
+ Pids0 = dict:store(From, IsCommitting, Pids),
+ CommitCount = dict:fold(fun(_K, V, Acc) ->
+ case V of
+ ShouldBe -> Acc+1;
+ _ -> Acc
+ end
+ end, 0, Pids0),
+ case Count =:= CommitCount of
+ true ->
+ [Pid || {Pid, _} <- dict:to_list(Pids0)];
+ false ->
+ wait_all_committing(Pids0, ShouldBe, Count)
+ end
+ end.
+
+
+create_ddocs(Db, DDocIds) ->
+ Docs = lists:map(fun(DDocId) ->
+ MapFun = <<"function(doc) {emit(\"", DDocId/binary, "\", 1);}">>,
+ Json = {[
+ {<<"_id">>, DDocId},
+ {<<"language">>, <<"javascript">>},
+ {<<"views">>, {[
+ {<<"v">>, {[
+ {<<"map">>, MapFun}
+ ]}}
+ ]}}
+ ]},
+ couch_doc:from_json_obj(Json)
+ end, DDocIds),
+ {ok, _} = couch_db:update_docs(Db, Docs, [?ADMIN_CTX]),
+ ok.
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.