You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2021/12/02 22:32:14 UTC

[couchdb] 02/02: Add sharding to couch_index_server

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch sharded_couch_index_server
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 253c9dc25bd6c2857d0cc6bdb0caf56291a91728
Author: Robert Newson <rn...@apache.org>
AuthorDate: Tue Nov 30 15:39:58 2021 +0000

    Add sharding to couch_index_server
---
 src/chttpd/src/chttpd_node.erl                     |   3 +-
 src/couch/src/couch_secondary_sup.erl              |  20 +-
 src/couch_index/src/couch_index.app.src            |   2 +-
 src/couch_index/src/couch_index_server.erl         | 206 +++++++++++++--------
 .../test/eunit/couch_index_ddoc_updated_tests.erl  |  10 +-
 .../test/eunit/couch_mrview_ddoc_updated_tests.erl |  10 +-
 6 files changed, 157 insertions(+), 94 deletions(-)

diff --git a/src/chttpd/src/chttpd_node.erl b/src/chttpd/src/chttpd_node.erl
index 7379dba..63a7fb1 100644
--- a/src/chttpd/src/chttpd_node.erl
+++ b/src/chttpd/src/chttpd_node.erl
@@ -282,7 +282,8 @@ get_stats() ->
     MessageQueues0 = [
         {couch_file, {CF}},
         {couch_db_updater, {CDU}},
-        {couch_server, couch_server:aggregate_queue_len()}
+        {couch_server, couch_server:aggregate_queue_len()},
+        {index_server, couch_index_server:aggregate_queue_len()}
     ],
     MessageQueues = MessageQueues0 ++ message_queues(registered()),
     {SQ, DCQ} = run_queues(),
diff --git a/src/couch/src/couch_secondary_sup.erl b/src/couch/src/couch_secondary_sup.erl
index a328c17..3eab831 100644
--- a/src/couch/src/couch_secondary_sup.erl
+++ b/src/couch/src/couch_secondary_sup.erl
@@ -22,12 +22,12 @@ init([]) ->
         {couch_plugin_event, {gen_event, start_link, [{local, couch_plugin}]}, permanent,
             brutal_kill, worker, dynamic}
     ],
-    Daemons = [
-        {index_server, {couch_index_server, start_link, []}},
-        {query_servers, {couch_proc_manager, start_link, []}},
-        {vhosts, {couch_httpd_vhost, start_link, []}},
-        {uuids, {couch_uuids, start, []}}
-    ],
+    Daemons =
+        [
+            {query_servers, {couch_proc_manager, start_link, []}},
+            {vhosts, {couch_httpd_vhost, start_link, []}},
+            {uuids, {couch_uuids, start, []}}
+        ] ++ couch_index_servers(),
 
     MaybeHttp =
         case http_enabled() of
@@ -69,3 +69,11 @@ https_enabled() ->
     LegacySSLEnabled = LegacySSL =:= "{chttpd, start_link, [https]}",
 
     SSLEnabled orelse LegacySSLEnabled.
+
+couch_index_servers() ->
+    N = couch_index_server:num_servers(),
+    [couch_index_server(I) || I <- lists:seq(1, N)].
+
+couch_index_server(N) ->
+    Name = couch_index_server:couch_index_server(N),
+    {Name, {couch_index_server, start_link, [N]}}.
diff --git a/src/couch_index/src/couch_index.app.src b/src/couch_index/src/couch_index.app.src
index 3aa92ba..834be3f 100644
--- a/src/couch_index/src/couch_index.app.src
+++ b/src/couch_index/src/couch_index.app.src
@@ -13,7 +13,7 @@
 {application, couch_index, [
     {description, "CouchDB Secondary Index Manager"},
     {vsn, git},
-    {registered, [couch_index_server]},
+    {registered, []},
     {applications, [kernel, stdlib, couch_epi]},
     {mod, {couch_index_app, []}}
 ]}.
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index 77f91cc..9031678 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -16,11 +16,15 @@
 
 -vsn(2).
 
--export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]).
+-export([start_link/1, validate/2, get_index/4, get_index/3, get_index/2]).
 
 -export([init/1, terminate/2, code_change/3]).
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 
+% Sharding functions
+-export([num_servers/0, couch_index_server/1, by_sig/1, by_pid/1, by_db/1]).
+-export([aggregate_queue_len/0]).
+
 % Exported for callbacks
 -export([
     handle_config_change/5,
@@ -30,15 +34,18 @@
 
 -include_lib("couch/include/couch_db.hrl").
 
--define(BY_SIG, couchdb_indexes_by_sig).
--define(BY_PID, couchdb_indexes_by_pid).
--define(BY_DB, couchdb_indexes_by_db).
 -define(RELISTEN_DELAY, 5000).
 
--record(st, {root_dir}).
+-record(st, {
+    root_dir,
+    index_server,
+    by_sig,
+    by_pid,
+    by_db
+}).
 
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(N) ->
+    gen_server:start_link({local, couch_index_server(N)}, ?MODULE, [N], []).
 
 validate(Db, DDoc) ->
     LoadModFun = fun
@@ -101,90 +108,97 @@ get_index(Module, Db, DDoc, _Fun) ->
 get_index(Module, IdxState) ->
     DbName = Module:get(db_name, IdxState),
     Sig = Module:get(signature, IdxState),
-    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+    case ets:lookup(by_sig(DbName), {DbName, Sig}) of
         [{_, Pid}] when is_pid(Pid) ->
             DDocId = Module:get(idx_name, IdxState),
-            case ets:match_object(?BY_DB, {DbName, {DDocId, Sig}}) of
+            case ets:match_object(by_db(DbName), {DbName, {DDocId, Sig}}) of
                 [] ->
                     Args = [Pid, DbName, DDocId, Sig],
-                    gen_server:cast(?MODULE, {add_to_ets, Args});
+                    gen_server:cast(couch_index_server(DbName), {add_to_ets, Args});
                 _ ->
                     ok
             end,
             {ok, Pid};
         _ ->
             Args = {Module, IdxState, DbName, Sig},
-            gen_server:call(?MODULE, {get_index, Args}, infinity)
+            gen_server:call(couch_index_server(DbName), {get_index, Args}, infinity)
     end.
 
-init([]) ->
+init([N]) ->
     process_flag(trap_exit, true),
-    ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()),
-    ets:new(?BY_SIG, [protected, set, named_table]),
-    ets:new(?BY_PID, [private, set, named_table]),
-    ets:new(?BY_DB, [protected, bag, named_table]),
-    couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
+    ets:new(by_sig(N), [protected, set, named_table]),
+    ets:new(by_pid(N), [private, set, named_table]),
+    ets:new(by_db(N), [protected, bag, named_table]),
     RootDir = couch_index_util:root_dir(),
     couch_file:init_delete_dir(RootDir),
-    {ok, #st{root_dir = RootDir}}.
+    St = #st{
+        root_dir = RootDir,
+        index_server = couch_index_server(N),
+        by_sig = by_sig(N),
+        by_pid = by_pid(N),
+        by_db = by_db(N)
+    },
+    ok = config:listen_for_changes(?MODULE, St),
+    couch_event:link_listener(?MODULE, handle_db_event, St, [all_dbs]),
+    {ok, St}.
 
-terminate(_Reason, _State) ->
-    Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)],
+terminate(_Reason, State) ->
+    Pids = [Pid || {Pid, _} <- ets:tab2list(State#st.by_pid)],
     lists:map(fun couch_util:shutdown_sync/1, Pids),
     ok.
 
 handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) ->
-    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+    case ets:lookup(State#st.by_sig, {DbName, Sig}) of
         [] ->
             spawn_link(fun() -> new_index(Args) end),
-            ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
+            ets:insert(State#st.by_sig, {{DbName, Sig}, [From]}),
             {noreply, State};
         [{_, Waiters}] when is_list(Waiters) ->
-            ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
+            ets:insert(State#st.by_sig, {{DbName, Sig}, [From | Waiters]}),
             {noreply, State};
         [{_, Pid}] when is_pid(Pid) ->
             {reply, {ok, Pid}, State}
     end;
 handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
-    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
     [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
     link(Pid),
-    add_to_ets(DbName, Sig, DDocId, Pid),
+    add_to_ets(DbName, Sig, DDocId, Pid, State),
     {reply, ok, State};
 handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
-    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
     [gen_server:reply(From, Error) || From <- Waiters],
-    ets:delete(?BY_SIG, {DbName, Sig}),
+    ets:delete(State#st.by_sig, {DbName, Sig}),
     {reply, ok, State};
 handle_call({reset_indexes, DbName}, _From, State) ->
-    reset_indexes(DbName, State#st.root_dir),
+    reset_indexes(DbName, State),
     {reply, ok, State}.
 
 handle_cast({reset_indexes, DbName}, State) ->
-    reset_indexes(DbName, State#st.root_dir),
+    reset_indexes(DbName, State),
     {noreply, State};
 handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) ->
     % check if Pid still exists
-    case ets:lookup(?BY_PID, Pid) of
+    case ets:lookup(State#st.by_pid, Pid) of
         [{Pid, {DbName, Sig}}] when is_pid(Pid) ->
-            ets:insert(?BY_DB, {DbName, {DDocId, Sig}});
+            ets:insert(State#st.by_db, {DbName, {DDocId, Sig}});
         _ ->
             ok
     end,
     {noreply, State};
 handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) ->
-    ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}),
+    ets:delete_object(State#st.by_db, {DbName, {DDocId, Sig}}),
     {noreply, State}.
 
 handle_info({'EXIT', Pid, Reason}, Server) ->
-    case ets:lookup(?BY_PID, Pid) of
+    case ets:lookup(Server#st.by_pid, Pid) of
         [{Pid, {DbName, Sig}}] ->
             DDocIds = [
                 DDocId
              || {_, {DDocId, _}} <-
-                    ets:match_object(?BY_DB, {DbName, {'$1', Sig}})
+                    ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
             ],
-            rem_from_ets(DbName, Sig, DDocIds, Pid);
+            rem_from_ets(DbName, Sig, DDocIds, Pid, Server);
         [] when Reason /= normal ->
             exit(Reason);
         _Else ->
@@ -201,23 +215,23 @@ handle_info(Msg, State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-handle_config_change("couchdb", "index_dir", RootDir, _, RootDir) ->
-    {ok, RootDir};
-handle_config_change("couchdb", "view_index_dir", RootDir, _, RootDir) ->
-    {ok, RootDir};
-handle_config_change("couchdb", "index_dir", _, _, _) ->
-    exit(whereis(couch_index_server), config_change),
+handle_config_change("couchdb", "index_dir", RootDir, _, #st{root_dir = RootDir} = St) ->
+    {ok, St};
+handle_config_change("couchdb", "view_index_dir", RootDir, _, #st{root_dir = RootDir} = St) ->
+    {ok, St};
+handle_config_change("couchdb", "index_dir", _, _, St) ->
+    exit(whereis(St#st.index_server), config_change),
     remove_handler;
-handle_config_change("couchdb", "view_index_dir", _, _, _) ->
-    exit(whereis(couch_index_server), config_change),
+handle_config_change("couchdb", "view_index_dir", _, _, St) ->
+    exit(whereis(St#st.index_server), config_change),
     remove_handler;
-handle_config_change(_, _, _, _, RootDir) ->
-    {ok, RootDir}.
+handle_config_change(_, _, _, _, St) ->
+    {ok, St}.
 
 handle_config_terminate(_, stop, _) ->
     ok;
-handle_config_terminate(_Server, _Reason, _State) ->
-    erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener),
+handle_config_terminate(_Server, _Reason, State) ->
+    erlang:send_after(?RELISTEN_DELAY, whereis(State#st.index_server), restart_config_listener),
     {ok, couch_index_util:root_dir()}.
 
 new_index({Mod, IdxState, DbName, Sig}) ->
@@ -225,26 +239,26 @@ new_index({Mod, IdxState, DbName, Sig}) ->
     case couch_index:start_link({Mod, IdxState}) of
         {ok, Pid} ->
             ok = gen_server:call(
-                ?MODULE, {async_open, {DbName, DDocId, Sig}, {ok, Pid}}
+                couch_index_server(DbName), {async_open, {DbName, DDocId, Sig}, {ok, Pid}}
             ),
             unlink(Pid);
         Error ->
             ok = gen_server:call(
-                ?MODULE, {async_error, {DbName, DDocId, Sig}, Error}
+                couch_index_server(DbName), {async_error, {DbName, DDocId, Sig}, Error}
             )
     end.
 
-reset_indexes(DbName, Root) ->
+reset_indexes(DbName, #st{} = State) ->
     % shutdown all the updaters and clear the files, the db got changed
     SigDDocIds = lists:foldl(
         fun({_, {DDocId, Sig}}, DDict) ->
             dict:append(Sig, DDocId, DDict)
         end,
         dict:new(),
-        ets:lookup(?BY_DB, DbName)
+        ets:lookup(State#st.by_db, DbName)
     ),
     Fun = fun({Sig, DDocIds}) ->
-        [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+        [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
         unlink(Pid),
         gen_server:cast(Pid, delete),
         receive
@@ -253,34 +267,42 @@ reset_indexes(DbName, Root) ->
         after 0 ->
             ok
         end,
-        rem_from_ets(DbName, Sig, DDocIds, Pid)
+        rem_from_ets(DbName, Sig, DDocIds, Pid, State)
     end,
     lists:foreach(Fun, dict:to_list(SigDDocIds)),
     Path = couch_index_util:index_dir("", DbName),
-    couch_file:nuke_dir(Root, Path).
+    couch_file:nuke_dir(State#st.root_dir, Path).
 
-add_to_ets(DbName, Sig, DDocId, Pid) ->
-    ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
-    ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
-    ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
+add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) ->
+    ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}),
+    ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}),
+    ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}).
 
-rem_from_ets(DbName, Sig, DDocIds, Pid) ->
-    ets:delete(?BY_SIG, {DbName, Sig}),
-    ets:delete(?BY_PID, Pid),
+rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
+    ets:delete(St#st.by_sig, {DbName, Sig}),
+    ets:delete(St#st.by_pid, Pid),
     lists:foreach(
         fun(DDocId) ->
-            ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}})
+            ets:delete_object(St#st.by_db, {DbName, {DDocId, Sig}})
         end,
         DDocIds
     ).
 
-handle_db_event(DbName, created, St) ->
-    gen_server:cast(?MODULE, {reset_indexes, DbName}),
+handle_db_event(DbName, Event, #st{} = St) ->
+    case couch_index_server(DbName) == St#st.index_server of
+        true ->
+            handle_db_event_int(DbName, Event, St);
+        false ->
+            {ok, St}
+    end.
+
+handle_db_event_int(DbName, created, St) ->
+    gen_server:cast(St#st.index_server, {reset_indexes, DbName}),
     {ok, St};
-handle_db_event(DbName, deleted, St) ->
-    gen_server:cast(?MODULE, {reset_indexes, DbName}),
+handle_db_event_int(DbName, deleted, St) ->
+    gen_server:cast(St#st.index_server, {reset_indexes, DbName}),
     {ok, St};
-handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
+handle_db_event_int(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
     DDocResult = couch_util:with_db(DbName, fun(Db) ->
         couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
     end),
@@ -297,15 +319,15 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
             lists:foreach(
                 fun({_DbShard, {_DDocId, Sig}}) ->
                     % check if there are other ddocs with the same Sig for the same db
-                    SigDDocs = ets:match_object(?BY_DB, {DbShard, {'$1', Sig}}),
+                    SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}),
                     if
                         length(SigDDocs) > 1 ->
-                            % remove records from ?BY_DB for this DDoc
+                            % remove records from by_db for this DDoc
                             Args = [DbShard, DDocId, Sig],
-                            gen_server:cast(?MODULE, {rem_from_ets, Args});
+                            gen_server:cast(St#st.index_server, {rem_from_ets, Args});
                         true ->
                             % single DDoc with this Sig - close couch_index processes
-                            case ets:lookup(?BY_SIG, {DbShard, Sig}) of
+                            case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
                                 [{_, IndexPid}] ->
                                     (catch gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
                                 [] ->
@@ -313,24 +335,56 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
                             end
                     end
                 end,
-                ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}})
+                ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
             )
         end,
         DbShards
     ),
     {ok, St};
-handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
+handle_db_event_int(DbName, {ddoc_updated, DDocId}, St) ->
     lists:foreach(
         fun({_DbName, {_DDocId, Sig}}) ->
-            case ets:lookup(?BY_SIG, {DbName, Sig}) of
+            case ets:lookup(St#st.by_sig, {DbName, Sig}) of
                 [{_, IndexPid}] ->
                     (catch gen_server:cast(IndexPid, ddoc_updated));
                 [] ->
                     ok
             end
         end,
-        ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}})
+        ets:match_object(St#st.by_db, {DbName, {DDocId, '$1'}})
     ),
     {ok, St};
-handle_db_event(_DbName, _Event, St) ->
+handle_db_event_int(_DbName, _Event, St) ->
     {ok, St}.
+
+num_servers() ->
+    erlang:system_info(schedulers).
+
+couch_index_server(Arg) ->
+    name("index_server", Arg).
+
+by_sig(Arg) ->
+    name("couchdb_indexes_by_sig", Arg).
+
+by_pid(Arg) ->
+    name("couchdb_indexes_by_pid", Arg).
+
+by_db(Arg) ->
+    name("couchdb_indexes_by_db", Arg).
+
+name(BaseName, Sig) when is_list(Sig) ->
+    name(BaseName, ?l2b(Sig));
+name(BaseName, Sig) when is_binary(Sig) ->
+    N = 1 + erlang:phash2(Sig, num_servers()),
+    name(BaseName, N);
+name(BaseName, N) when is_integer(N), N > 0 ->
+    list_to_atom(BaseName ++ "_" ++ integer_to_list(N)).
+
+aggregate_queue_len() ->
+    N = num_servers(),
+    Names = [couch_index_server(I) || I <- lists:seq(1, N)],
+    MQs = [
+        process_info(whereis(Name), message_queue_len)
+     || Name <- Names
+    ],
+    lists:sum([X || {_, X} <- MQs]).
diff --git a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
index 7bee8ba..3a4f0e9 100644
--- a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
+++ b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
@@ -85,7 +85,7 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
             DbShards
         ),
 
-        IndexesBefore = get_indexes_by_ddoc(DDocID, N),
+        IndexesBefore = get_indexes_by_ddoc(DbName, DDocID, N),
         ?assertEqual(N, length(IndexesBefore)),
 
         AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
@@ -108,7 +108,7 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
         ),
 
         ok = meck:wait(N, test_index, init, ['_', '_'], 5000),
-        IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
+        IndexesAfter = get_indexes_by_ddoc(DbName, DDocID, 0),
         ?assertEqual(0, length(IndexesAfter)),
 
         %% assert that previously running indexes are gone
@@ -137,10 +137,10 @@ fake_index() ->
     end),
     ok = meck:expect(test_index, shutdown, ['_'], ok).
 
-get_indexes_by_ddoc(DDocID, N) ->
+get_indexes_by_ddoc(DbName0, DDocID, N) ->
     Indexes = test_util:wait(fun() ->
         Indxs = ets:match_object(
-            couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}
+            couch_index_server:by_db(DbName0), {'$1', {DDocID, '$2'}}
         ),
         case length(Indxs) == N of
             true ->
@@ -151,7 +151,7 @@ get_indexes_by_ddoc(DDocID, N) ->
     end),
     lists:foldl(
         fun({DbName, {_DDocID, Sig}}, Acc) ->
-            case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of
+            case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of
                 [{_, Pid}] -> [Pid | Acc];
                 _ -> Acc
             end
diff --git a/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl b/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
index 2a62994..36b0841 100644
--- a/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
+++ b/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
@@ -88,7 +88,7 @@ check_indexing_stops_on_ddoc_change(Db) ->
     ?_test(begin
         DDocID = <<"_design/bar">>,
 
-        IndexesBefore = get_indexes_by_ddoc(DDocID, 1),
+        IndexesBefore = get_indexes_by_ddoc(Db#db.name, DDocID, 1),
         ?assertEqual(1, length(IndexesBefore)),
         AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
         ?assertEqual(1, length(AliveBefore)),
@@ -127,16 +127,16 @@ check_indexing_stops_on_ddoc_change(Db) ->
         end,
 
         %% assert that previously running indexes are gone
-        IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
+        IndexesAfter = get_indexes_by_ddoc(Db#db.name, DDocID, 0),
         ?assertEqual(0, length(IndexesAfter)),
         AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
         ?assertEqual(0, length(AliveAfter))
     end).
 
-get_indexes_by_ddoc(DDocID, N) ->
+get_indexes_by_ddoc(DbName0, DDocID, N) ->
     Indexes = test_util:wait(fun() ->
         Indxs = ets:match_object(
-            couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}
+            couch_index_server:by_db(DbName0), {'$1', {DDocID, '$2'}}
         ),
         case length(Indxs) == N of
             true ->
@@ -147,7 +147,7 @@ get_indexes_by_ddoc(DDocID, N) ->
     end),
     lists:foldl(
         fun({DbName, {_DDocID, Sig}}, Acc) ->
-            case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of
+            case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of
                 [{_, Pid}] -> [Pid | Acc];
                 _ -> Acc
             end