You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2021/12/02 20:04:30 UTC

[couchdb] branch sharded_couch_index_server updated (2d8834c -> 5f9be0b)

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a change to branch sharded_couch_index_server
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard 2d8834c  Add sharding to couch_index_server
     new 5f9be0b  Add sharding to couch_index_server

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (2d8834c)
            \
             N -- N -- N   refs/heads/sharded_couch_index_server (5f9be0b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/chttpd/src/chttpd_node.erl             |  2 +-
 src/couch_index/src/couch_index_server.erl | 67 ++++++++++++------------------
 2 files changed, 28 insertions(+), 41 deletions(-)

[couchdb] 01/01: Add sharding to couch_index_server

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch sharded_couch_index_server
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 5f9be0bd147cffbadf11b9c2f08defa5e46cb088
Author: Robert Newson <rn...@apache.org>
AuthorDate: Tue Nov 30 15:39:58 2021 +0000

    Add sharding to couch_index_server
---
 src/chttpd/src/chttpd_node.erl                     |   3 +-
 src/couch/src/couch_secondary_sup.erl              |  11 +-
 src/couch_index/src/couch_index.app.src            |   2 +-
 src/couch_index/src/couch_index_server.erl         | 213 +++++++++++++--------
 .../test/eunit/couch_index_ddoc_updated_tests.erl  |  10 +-
 .../test/eunit/couch_mrview_ddoc_updated_tests.erl |  10 +-
 6 files changed, 159 insertions(+), 90 deletions(-)

diff --git a/src/chttpd/src/chttpd_node.erl b/src/chttpd/src/chttpd_node.erl
index e92a1e5..0f90359 100644
--- a/src/chttpd/src/chttpd_node.erl
+++ b/src/chttpd/src/chttpd_node.erl
@@ -240,7 +240,8 @@ get_stats() ->
     {{input, Input}, {output, Output}} = statistics(io),
     {CF, CDU} = db_pid_stats(),
     MessageQueues0 = [{couch_file, {CF}}, {couch_db_updater, {CDU}},
-        {couch_server, couch_server:aggregate_queue_len()}],
+        {couch_server, couch_server:aggregate_queue_len()},
+        {index_server, couch_index_server:aggregate_queue_len()}],
     MessageQueues = MessageQueues0 ++ message_queues(registered()),
     {SQ, DCQ} = run_queues(),
     [
diff --git a/src/couch/src/couch_secondary_sup.erl b/src/couch/src/couch_secondary_sup.erl
index bb78215..87d1113 100644
--- a/src/couch/src/couch_secondary_sup.erl
+++ b/src/couch/src/couch_secondary_sup.erl
@@ -27,11 +27,10 @@ init([]) ->
             dynamic}
     ],
     Daemons = [
-        {index_server, {couch_index_server, start_link, []}},
         {query_servers, {couch_proc_manager, start_link, []}},
         {vhosts, {couch_httpd_vhost, start_link, []}},
         {uuids, {couch_uuids, start, []}}
-    ],
+    ] ++ couch_index_servers(),
 
     MaybeHttp = case http_enabled() of
         true -> [{httpd, {couch_httpd, start_link, []}}];
@@ -70,3 +69,11 @@ https_enabled() ->
     LegacySSLEnabled = LegacySSL =:= "{chttpd, start_link, [https]}",
 
     SSLEnabled orelse LegacySSLEnabled.
+
+couch_index_servers() ->
+    N = couch_index_server:num_servers(),
+    [couch_index_server(I) || I <- lists:seq(1, N)].
+
+couch_index_server(N) ->
+    Name = couch_index_server:couch_index_server(N),
+    {Name, {couch_index_server, start_link, [N]}}.
diff --git a/src/couch_index/src/couch_index.app.src b/src/couch_index/src/couch_index.app.src
index 3aa92ba..834be3f 100644
--- a/src/couch_index/src/couch_index.app.src
+++ b/src/couch_index/src/couch_index.app.src
@@ -13,7 +13,7 @@
 {application, couch_index, [
     {description, "CouchDB Secondary Index Manager"},
     {vsn, git},
-    {registered, [couch_index_server]},
+    {registered, []},
     {applications, [kernel, stdlib, couch_epi]},
     {mod, {couch_index_app, []}}
 ]}.
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index 6bebff2..e68c4fc 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -16,11 +16,15 @@
 
 -vsn(2).
 
--export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]).
+-export([start_link/1, validate/2, get_index/4, get_index/3, get_index/2]).
 
 -export([init/1, terminate/2, code_change/3]).
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 
+% Sharding functions
+-export([num_servers/0, couch_index_server/1, by_sig/1, by_pid/1, by_db/1]).
+-export([aggregate_queue_len/0]).
+
 % Exported for callbacks
 -export([
     handle_config_change/5,
@@ -30,15 +34,18 @@
 
 -include_lib("couch/include/couch_db.hrl").
 
--define(BY_SIG, couchdb_indexes_by_sig).
--define(BY_PID, couchdb_indexes_by_pid).
--define(BY_DB, couchdb_indexes_by_db).
 -define(RELISTEN_DELAY, 5000).
 
--record(st, {root_dir}).
+-record(st, {
+    root_dir,
+    index_server,
+    by_sig,
+    by_pid,
+    by_db
+}).
 
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(N) ->
+    gen_server:start_link({local, couch_index_server(N)}, ?MODULE, [N], []).
 
 
 validate(Db, DDoc) ->
@@ -103,89 +110,96 @@ get_index(Module, Db, DDoc, _Fun) ->
 get_index(Module, IdxState) ->
     DbName = Module:get(db_name, IdxState),
     Sig = Module:get(signature, IdxState),
-    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+    case ets:lookup(by_sig(DbName), {DbName, Sig}) of
         [{_, Pid}] when is_pid(Pid) ->
             DDocId = Module:get(idx_name, IdxState),
-            case ets:match_object(?BY_DB, {DbName, {DDocId, Sig}}) of
+            case ets:match_object(by_db(DbName), {DbName, {DDocId, Sig}}) of
                 [] ->
                     Args = [Pid, DbName, DDocId, Sig],
-                    gen_server:cast(?MODULE, {add_to_ets, Args});
+                    gen_server:cast(couch_index_server(DbName), {add_to_ets, Args});
                 _ -> ok
             end,
             {ok, Pid};
         _ ->
             Args = {Module, IdxState, DbName, Sig},
-            gen_server:call(?MODULE, {get_index, Args}, infinity)
+            gen_server:call(couch_index_server(DbName), {get_index, Args}, infinity)
     end.
 
 
-init([]) ->
+init([N]) ->
     process_flag(trap_exit, true),
-    ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()),
-    ets:new(?BY_SIG, [protected, set, named_table]),
-    ets:new(?BY_PID, [private, set, named_table]),
-    ets:new(?BY_DB, [protected, bag, named_table]),
-    couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
+    ets:new(by_sig(N), [protected, set, named_table]),
+    ets:new(by_pid(N), [private, set, named_table]),
+    ets:new(by_db(N), [protected, bag, named_table]),
     RootDir = couch_index_util:root_dir(),
     couch_file:init_delete_dir(RootDir),
-    {ok, #st{root_dir=RootDir}}.
+    St = #st{
+        root_dir=RootDir,
+        index_server=couch_index_server(N),
+        by_sig=by_sig(N),
+        by_pid=by_pid(N),
+        by_db=by_db(N)
+    },
+    ok = config:listen_for_changes(?MODULE, St),
+    couch_event:link_listener(?MODULE, handle_db_event, St, [all_dbs]),
+    {ok, St}.
 
 
-terminate(_Reason, _State) ->
-    Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)],
+terminate(_Reason, State) ->
+    Pids = [Pid || {Pid, _} <- ets:tab2list(State#st.by_pid)],
     lists:map(fun couch_util:shutdown_sync/1, Pids),
     ok.
 
 
 handle_call({get_index, {_Mod, _IdxState, DbName, Sig}=Args}, From, State) ->
-    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+    case ets:lookup(State#st.by_sig, {DbName, Sig}) of
         [] ->
             spawn_link(fun() -> new_index(Args) end),
-            ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
+            ets:insert(State#st.by_sig, {{DbName, Sig}, [From]}),
             {noreply, State};
         [{_, Waiters}] when is_list(Waiters) ->
-            ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
+            ets:insert(State#st.by_sig, {{DbName, Sig}, [From | Waiters]}),
             {noreply, State};
         [{_, Pid}] when is_pid(Pid) ->
             {reply, {ok, Pid}, State}
     end;
 handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
-    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
     [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
     link(Pid),
-    add_to_ets(DbName, Sig, DDocId, Pid),
+    add_to_ets(DbName, Sig, DDocId, Pid, State),
     {reply, ok, State};
 handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
-    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
     [gen_server:reply(From, Error) || From <- Waiters],
-    ets:delete(?BY_SIG, {DbName, Sig}),
+    ets:delete(State#st.by_sig, {DbName, Sig}),
     {reply, ok, State};
 handle_call({reset_indexes, DbName}, _From, State) ->
-    reset_indexes(DbName, State#st.root_dir),
+    reset_indexes(DbName, State),
     {reply, ok, State}.
 
 
 handle_cast({reset_indexes, DbName}, State) ->
-    reset_indexes(DbName, State#st.root_dir),
+    reset_indexes(DbName, State),
     {noreply, State};
 handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) ->
     % check if Pid still exists
-    case ets:lookup(?BY_PID, Pid) of
+    case ets:lookup(State#st.by_pid, Pid) of
         [{Pid, {DbName, Sig}}] when is_pid(Pid) ->
-            ets:insert(?BY_DB, {DbName, {DDocId, Sig}});
+            ets:insert(State#st.by_db, {DbName, {DDocId, Sig}});
         _ -> ok
     end,
     {noreply, State};
 handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) ->
-    ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}),
+    ets:delete_object(State#st.by_db, {DbName, {DDocId, Sig}}),
     {noreply, State}.
 
 handle_info({'EXIT', Pid, Reason}, Server) ->
-    case ets:lookup(?BY_PID, Pid) of
+    case ets:lookup(Server#st.by_pid, Pid) of
         [{Pid, {DbName, Sig}}] ->
             DDocIds = [DDocId || {_, {DDocId, _}}
-                <- ets:match_object(?BY_DB, {DbName, {'$1', Sig}})],
-            rem_from_ets(DbName, Sig, DDocIds, Pid);
+                <- ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})],
+            rem_from_ets(DbName, Sig, DDocIds, Pid, Server);
         [] when Reason /= normal ->
             exit(Reason);
         _Else ->
@@ -204,23 +218,23 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-handle_config_change("couchdb", "index_dir", RootDir, _, RootDir) ->
-    {ok, RootDir};
-handle_config_change("couchdb", "view_index_dir", RootDir, _, RootDir) ->
-    {ok, RootDir};
-handle_config_change("couchdb", "index_dir", _, _, _) ->
-    exit(whereis(couch_index_server), config_change),
+handle_config_change("couchdb", "index_dir", RootDir, _, #st{root_dir = RootDir} = St) ->
+    {ok, St};
+handle_config_change("couchdb", "view_index_dir", RootDir, _, #st{root_dir = RootDir} = St) ->
+    {ok, St};
+handle_config_change("couchdb", "index_dir", _, _, St) ->
+    exit(whereis(St#st.index_server), config_change),
     remove_handler;
-handle_config_change("couchdb", "view_index_dir", _, _, _) ->
-    exit(whereis(couch_index_server), config_change),
+handle_config_change("couchdb", "view_index_dir", _, _, St) ->
+    exit(whereis(St#st.index_server), config_change),
     remove_handler;
-handle_config_change(_, _, _, _, RootDir) ->
-    {ok, RootDir}.
+handle_config_change(_, _, _, _, St) ->
+    {ok, St}.
 
 handle_config_terminate(_, stop, _) ->
     ok;
-handle_config_terminate(_Server, _Reason, _State) ->
-    erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener),
+handle_config_terminate(_Server, _Reason, State) ->
+    erlang:send_after(?RELISTEN_DELAY, whereis(State#st.index_server), restart_config_listener),
     {ok, couch_index_util:root_dir()}.
 
 new_index({Mod, IdxState, DbName, Sig}) ->
@@ -228,21 +242,21 @@ new_index({Mod, IdxState, DbName, Sig}) ->
     case couch_index:start_link({Mod, IdxState}) of
         {ok, Pid} ->
             ok = gen_server:call(
-                ?MODULE, {async_open, {DbName, DDocId, Sig}, {ok, Pid}}),
+                couch_index_server(DbName), {async_open, {DbName, DDocId, Sig}, {ok, Pid}}),
             unlink(Pid);
         Error ->
             ok = gen_server:call(
-                ?MODULE, {async_error, {DbName, DDocId, Sig}, Error})
+                couch_index_server(DbName), {async_error, {DbName, DDocId, Sig}, Error})
     end.
 
 
-reset_indexes(DbName, Root) ->
+reset_indexes(DbName, #st{} = State) ->
     % shutdown all the updaters and clear the files, the db got changed
     SigDDocIds = lists:foldl(fun({_, {DDocId, Sig}}, DDict) ->
         dict:append(Sig, DDocId, DDict)
-    end, dict:new(), ets:lookup(?BY_DB, DbName)),
+    end, dict:new(), ets:lookup(State#st.by_db, DbName)),
     Fun = fun({Sig, DDocIds}) ->
-        [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+        [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
         unlink(Pid),
         gen_server:cast(Pid, delete),
         receive
@@ -252,34 +266,42 @@ reset_indexes(DbName, Root) ->
             0 ->
                 ok
         end,
-        rem_from_ets(DbName, Sig, DDocIds, Pid)
+        rem_from_ets(DbName, Sig, DDocIds, Pid, State)
     end,
     lists:foreach(Fun, dict:to_list(SigDDocIds)),
     Path = couch_index_util:index_dir("", DbName),
-    couch_file:nuke_dir(Root, Path).
+    couch_file:nuke_dir(State#st.root_dir, Path).
 
 
-add_to_ets(DbName, Sig, DDocId, Pid) ->
-    ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
-    ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
-    ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
+add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) ->
+    ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}),
+    ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}),
+    ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}).
 
 
-rem_from_ets(DbName, Sig, DDocIds, Pid) ->
-    ets:delete(?BY_SIG, {DbName, Sig}),
-    ets:delete(?BY_PID, Pid),
+rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
+    ets:delete(St#st.by_sig, {DbName, Sig}),
+    ets:delete(St#st.by_pid, Pid),
     lists:foreach(fun(DDocId) ->
-        ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}})
+        ets:delete_object(St#st.by_db, {DbName, {DDocId, Sig}})
     end, DDocIds).
 
 
-handle_db_event(DbName, created, St) ->
-    gen_server:cast(?MODULE, {reset_indexes, DbName}),
+handle_db_event(DbName, Event, #st{} = St) ->
+    case couch_index_server(DbName) == St#st.index_server of
+        true ->
+            handle_db_event_int(DbName, Event, St);
+        false ->
+            {ok, St}
+    end.
+
+handle_db_event_int(DbName, created, St) ->
+    gen_server:cast(St#st.index_server, {reset_indexes, DbName}),
     {ok, St};
-handle_db_event(DbName, deleted, St) ->
-    gen_server:cast(?MODULE, {reset_indexes, DbName}),
+handle_db_event_int(DbName, deleted, St) ->
+    gen_server:cast(St#st.index_server, {reset_indexes, DbName}),
     {ok, St};
-handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated,
+handle_db_event_int(<<"shards/", _/binary>> = DbName, {ddoc_updated,
         DDocId}, St) ->
     DDocResult = couch_util:with_db(DbName, fun(Db) ->
         couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
@@ -292,31 +314,70 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated,
     lists:foreach(fun(DbShard) ->
         lists:foreach(fun({_DbShard, {_DDocId, Sig}}) ->
             % check if there are other ddocs with the same Sig for the same db
-            SigDDocs = ets:match_object(?BY_DB, {DbShard, {'$1', Sig}}),
+            SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}),
             if length(SigDDocs) > 1 ->
-                % remove records from ?BY_DB for this DDoc
+                % remove records from by_db for this DDoc
                 Args = [DbShard, DDocId, Sig],
-                gen_server:cast(?MODULE, {rem_from_ets, Args});
+                gen_server:cast(St#st.index_server, {rem_from_ets, Args});
             true ->
                 % single DDoc with this Sig - close couch_index processes
-                case ets:lookup(?BY_SIG, {DbShard, Sig}) of
+                case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
                     [{_, IndexPid}] -> (catch
                         gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
                     [] -> []
                 end
             end
-        end, ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}}))
+        end, ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}}))
     end, DbShards),
     {ok, St};
-handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
+handle_db_event_int(DbName, {ddoc_updated, DDocId}, St) ->
     lists:foreach(fun({_DbName, {_DDocId, Sig}}) ->
-        case ets:lookup(?BY_SIG, {DbName, Sig}) of
+        case ets:lookup(St#st.by_sig, {DbName, Sig}) of
             [{_, IndexPid}] ->
                 (catch gen_server:cast(IndexPid, ddoc_updated));
             [] ->
                 ok
         end
-    end, ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}})),
+    end, ets:match_object(St#st.by_db, {DbName, {DDocId, '$1'}})),
     {ok, St};
-handle_db_event(_DbName, _Event, St) ->
+handle_db_event_int(_DbName, _Event, St) ->
     {ok, St}.
+
+
+num_servers() ->
+    erlang:system_info(schedulers).
+
+
+couch_index_server(Arg) ->
+    name("index_server", Arg).
+
+
+by_sig(Arg) ->
+    name("couchdb_indexes_by_sig", Arg).
+
+
+by_pid(Arg) ->
+    name("couchdb_indexes_by_pid", Arg).
+
+
+by_db(Arg) ->
+    name("couchdb_indexes_by_db", Arg).
+
+
+name(BaseName, Sig) when is_list(Sig) ->
+    name(BaseName, ?l2b(Sig));
+
+name(BaseName, Sig) when is_binary(Sig) ->
+    N = 1 + erlang:phash2(Sig, num_servers()),
+    name(BaseName, N);
+
+name(BaseName, N) when is_integer(N), N > 0 ->
+    list_to_atom(BaseName ++ "_" ++ integer_to_list(N)).
+
+
+aggregate_queue_len() ->
+    N = num_servers(),
+    Names = [couch_index_server(I) || I <- lists:seq(1, N)],
+    MQs = [process_info(whereis(Name), message_queue_len) ||
+        Name <- Names],
+    lists:sum([X || {_, X} <- MQs]).
diff --git a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
index 0e23adf..8d06ca4 100644
--- a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
+++ b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
@@ -74,7 +74,7 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
             couch_index_server:get_index(test_index, ShardDb, DDoc)
         end, DbShards),
 
-        IndexesBefore = get_indexes_by_ddoc(DDocID, N),
+        IndexesBefore = get_indexes_by_ddoc(DbName, DDocID, N),
         ?assertEqual(N, length(IndexesBefore)),
 
         AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
@@ -94,7 +94,7 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
             couch_db:name(DbShard), {ddoc_updated, DDocID}, {st, ""}),
 
         ok = meck:wait(N, test_index, init, ['_', '_'], 5000),
-        IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
+        IndexesAfter = get_indexes_by_ddoc(DbName, DDocID, 0),
         ?assertEqual(0, length(IndexesAfter)),
 
         %% assert that previously running indexes are gone
@@ -125,10 +125,10 @@ fake_index() ->
     ok = meck:expect(test_index, shutdown, ['_'], ok).
 
 
-get_indexes_by_ddoc(DDocID, N) ->
+get_indexes_by_ddoc(DbName0, DDocID, N) ->
     Indexes = test_util:wait(fun() ->
         Indxs = ets:match_object(
-            couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}),
+            couch_index_server:by_db(DbName0), {'$1', {DDocID, '$2'}}),
         case length(Indxs) == N of
             true ->
                 Indxs;
@@ -137,7 +137,7 @@ get_indexes_by_ddoc(DDocID, N) ->
         end
     end),
     lists:foldl(fun({DbName, {_DDocID, Sig}}, Acc) ->
-        case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of
+        case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of
             [{_, Pid}] -> [Pid|Acc];
             _ -> Acc
         end
diff --git a/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl b/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
index 4310157..dddcc83 100644
--- a/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
+++ b/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
@@ -85,7 +85,7 @@ check_indexing_stops_on_ddoc_change(Db) ->
     ?_test(begin
         DDocID = <<"_design/bar">>,
 
-        IndexesBefore = get_indexes_by_ddoc(DDocID, 1),
+        IndexesBefore = get_indexes_by_ddoc(Db#db.name, DDocID, 1),
         ?assertEqual(1, length(IndexesBefore)),
         AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
         ?assertEqual(1, length(AliveBefore)),
@@ -117,17 +117,17 @@ check_indexing_stops_on_ddoc_change(Db) ->
         end,
 
         %% assert that previously running indexes are gone
-        IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
+        IndexesAfter = get_indexes_by_ddoc(Db#db.name, DDocID, 0),
         ?assertEqual(0, length(IndexesAfter)),
         AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
         ?assertEqual(0, length(AliveAfter))
     end).
 
 
-get_indexes_by_ddoc(DDocID, N) ->
+get_indexes_by_ddoc(DbName0, DDocID, N) ->
     Indexes = test_util:wait(fun() ->
         Indxs = ets:match_object(
-            couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}),
+            couch_index_server:by_db(DbName0), {'$1', {DDocID, '$2'}}),
         case length(Indxs) == N of
             true ->
                 Indxs;
@@ -136,7 +136,7 @@ get_indexes_by_ddoc(DDocID, N) ->
         end
     end),
     lists:foldl(fun({DbName, {_DDocID, Sig}}, Acc) ->
-        case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of
+        case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of
             [{_, Pid}] -> [Pid|Acc];
             _ -> Acc
         end