You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2021/12/03 11:17:27 UTC

[couchdb] branch sharded_couch_index_server updated (253c9dc -> 66a1316)

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a change to branch sharded_couch_index_server
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard 253c9dc  Add sharding to couch_index_server
     new 66a1316  Add sharding to couch_index_server

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (253c9dc)
            \
             N -- N -- N   refs/heads/sharded_couch_index_server (66a1316)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch_index/src/couch_index_server.erl         |  19 ++--
 .../test/eunit/couch_index_ddoc_updated_tests.erl  | 109 ++++++++++-----------
 2 files changed, 58 insertions(+), 70 deletions(-)

[couchdb] 01/01: Add sharding to couch_index_server

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch sharded_couch_index_server
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 66a1316fe1f5e740acee7864b5560f1659a38f6e
Author: Robert Newson <rn...@apache.org>
AuthorDate: Tue Nov 30 15:39:58 2021 +0000

    Add sharding to couch_index_server
---
 src/chttpd/src/chttpd_node.erl                     |   3 +-
 src/couch/src/couch_secondary_sup.erl              |  20 ++-
 src/couch_index/src/couch_index.app.src            |   2 +-
 src/couch_index/src/couch_index_server.erl         | 189 +++++++++++++--------
 .../test/eunit/couch_index_ddoc_updated_tests.erl  | 101 ++++++-----
 .../test/eunit/couch_mrview_ddoc_updated_tests.erl |  10 +-
 6 files changed, 188 insertions(+), 137 deletions(-)

diff --git a/src/chttpd/src/chttpd_node.erl b/src/chttpd/src/chttpd_node.erl
index 7379dba..63a7fb1 100644
--- a/src/chttpd/src/chttpd_node.erl
+++ b/src/chttpd/src/chttpd_node.erl
@@ -282,7 +282,8 @@ get_stats() ->
     MessageQueues0 = [
         {couch_file, {CF}},
         {couch_db_updater, {CDU}},
-        {couch_server, couch_server:aggregate_queue_len()}
+        {couch_server, couch_server:aggregate_queue_len()},
+        {index_server, couch_index_server:aggregate_queue_len()}
     ],
     MessageQueues = MessageQueues0 ++ message_queues(registered()),
     {SQ, DCQ} = run_queues(),
diff --git a/src/couch/src/couch_secondary_sup.erl b/src/couch/src/couch_secondary_sup.erl
index a328c17..3eab831 100644
--- a/src/couch/src/couch_secondary_sup.erl
+++ b/src/couch/src/couch_secondary_sup.erl
@@ -22,12 +22,12 @@ init([]) ->
         {couch_plugin_event, {gen_event, start_link, [{local, couch_plugin}]}, permanent,
             brutal_kill, worker, dynamic}
     ],
-    Daemons = [
-        {index_server, {couch_index_server, start_link, []}},
-        {query_servers, {couch_proc_manager, start_link, []}},
-        {vhosts, {couch_httpd_vhost, start_link, []}},
-        {uuids, {couch_uuids, start, []}}
-    ],
+    Daemons =
+        [
+            {query_servers, {couch_proc_manager, start_link, []}},
+            {vhosts, {couch_httpd_vhost, start_link, []}},
+            {uuids, {couch_uuids, start, []}}
+        ] ++ couch_index_servers(),
 
     MaybeHttp =
         case http_enabled() of
@@ -69,3 +69,11 @@ https_enabled() ->
     LegacySSLEnabled = LegacySSL =:= "{chttpd, start_link, [https]}",
 
     SSLEnabled orelse LegacySSLEnabled.
+
+couch_index_servers() ->
+    N = couch_index_server:num_servers(),
+    [couch_index_server(I) || I <- lists:seq(1, N)].
+
+couch_index_server(N) ->
+    Name = couch_index_server:couch_index_server(N),
+    {Name, {couch_index_server, start_link, [N]}}.
diff --git a/src/couch_index/src/couch_index.app.src b/src/couch_index/src/couch_index.app.src
index 3aa92ba..834be3f 100644
--- a/src/couch_index/src/couch_index.app.src
+++ b/src/couch_index/src/couch_index.app.src
@@ -13,7 +13,7 @@
 {application, couch_index, [
     {description, "CouchDB Secondary Index Manager"},
     {vsn, git},
-    {registered, [couch_index_server]},
+    {registered, []},
     {applications, [kernel, stdlib, couch_epi]},
     {mod, {couch_index_app, []}}
 ]}.
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index 77f91cc..762249d 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -16,11 +16,15 @@
 
 -vsn(2).
 
--export([start_link/0, validate/2, get_index/4, get_index/3, get_index/2]).
+-export([start_link/1, validate/2, get_index/4, get_index/3, get_index/2]).
 
 -export([init/1, terminate/2, code_change/3]).
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 
+% Sharding functions
+-export([num_servers/0, couch_index_server/1, by_sig/1, by_pid/1, by_db/1]).
+-export([aggregate_queue_len/0]).
+
 % Exported for callbacks
 -export([
     handle_config_change/5,
@@ -30,15 +34,18 @@
 
 -include_lib("couch/include/couch_db.hrl").
 
--define(BY_SIG, couchdb_indexes_by_sig).
--define(BY_PID, couchdb_indexes_by_pid).
--define(BY_DB, couchdb_indexes_by_db).
 -define(RELISTEN_DELAY, 5000).
 
--record(st, {root_dir}).
+-record(st, {
+    root_dir,
+    index_server,
+    by_sig,
+    by_pid,
+    by_db
+}).
 
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(N) ->
+    gen_server:start_link({local, couch_index_server(N)}, ?MODULE, [N], []).
 
 validate(Db, DDoc) ->
     LoadModFun = fun
@@ -101,90 +108,97 @@ get_index(Module, Db, DDoc, _Fun) ->
 get_index(Module, IdxState) ->
     DbName = Module:get(db_name, IdxState),
     Sig = Module:get(signature, IdxState),
-    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+    case ets:lookup(by_sig(DbName), {DbName, Sig}) of
         [{_, Pid}] when is_pid(Pid) ->
             DDocId = Module:get(idx_name, IdxState),
-            case ets:match_object(?BY_DB, {DbName, {DDocId, Sig}}) of
+            case ets:match_object(by_db(DbName), {DbName, {DDocId, Sig}}) of
                 [] ->
                     Args = [Pid, DbName, DDocId, Sig],
-                    gen_server:cast(?MODULE, {add_to_ets, Args});
+                    gen_server:cast(couch_index_server(DbName), {add_to_ets, Args});
                 _ ->
                     ok
             end,
             {ok, Pid};
         _ ->
             Args = {Module, IdxState, DbName, Sig},
-            gen_server:call(?MODULE, {get_index, Args}, infinity)
+            gen_server:call(couch_index_server(DbName), {get_index, Args}, infinity)
     end.
 
-init([]) ->
+init([N]) ->
     process_flag(trap_exit, true),
-    ok = config:listen_for_changes(?MODULE, couch_index_util:root_dir()),
-    ets:new(?BY_SIG, [protected, set, named_table]),
-    ets:new(?BY_PID, [private, set, named_table]),
-    ets:new(?BY_DB, [protected, bag, named_table]),
-    couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
+    ets:new(by_sig(N), [protected, set, named_table]),
+    ets:new(by_pid(N), [private, set, named_table]),
+    ets:new(by_db(N), [protected, bag, named_table]),
     RootDir = couch_index_util:root_dir(),
     couch_file:init_delete_dir(RootDir),
-    {ok, #st{root_dir = RootDir}}.
+    St = #st{
+        root_dir = RootDir,
+        index_server = couch_index_server(N),
+        by_sig = by_sig(N),
+        by_pid = by_pid(N),
+        by_db = by_db(N)
+    },
+    ok = config:listen_for_changes(?MODULE, St),
+    couch_event:link_listener(?MODULE, handle_db_event, St, [all_dbs]),
+    {ok, St}.
 
-terminate(_Reason, _State) ->
-    Pids = [Pid || {Pid, _} <- ets:tab2list(?BY_PID)],
+terminate(_Reason, State) ->
+    Pids = [Pid || {Pid, _} <- ets:tab2list(State#st.by_pid)],
     lists:map(fun couch_util:shutdown_sync/1, Pids),
     ok.
 
 handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) ->
-    case ets:lookup(?BY_SIG, {DbName, Sig}) of
+    case ets:lookup(State#st.by_sig, {DbName, Sig}) of
         [] ->
             spawn_link(fun() -> new_index(Args) end),
-            ets:insert(?BY_SIG, {{DbName, Sig}, [From]}),
+            ets:insert(State#st.by_sig, {{DbName, Sig}, [From]}),
             {noreply, State};
         [{_, Waiters}] when is_list(Waiters) ->
-            ets:insert(?BY_SIG, {{DbName, Sig}, [From | Waiters]}),
+            ets:insert(State#st.by_sig, {{DbName, Sig}, [From | Waiters]}),
             {noreply, State};
         [{_, Pid}] when is_pid(Pid) ->
             {reply, {ok, Pid}, State}
     end;
 handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, _From, State) ->
-    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
     [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
     link(Pid),
-    add_to_ets(DbName, Sig, DDocId, Pid),
+    add_to_ets(DbName, Sig, DDocId, Pid, State),
     {reply, ok, State};
 handle_call({async_error, {DbName, _DDocId, Sig}, Error}, _From, State) ->
-    [{_, Waiters}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
     [gen_server:reply(From, Error) || From <- Waiters],
-    ets:delete(?BY_SIG, {DbName, Sig}),
+    ets:delete(State#st.by_sig, {DbName, Sig}),
     {reply, ok, State};
 handle_call({reset_indexes, DbName}, _From, State) ->
-    reset_indexes(DbName, State#st.root_dir),
+    reset_indexes(DbName, State),
     {reply, ok, State}.
 
 handle_cast({reset_indexes, DbName}, State) ->
-    reset_indexes(DbName, State#st.root_dir),
+    reset_indexes(DbName, State),
     {noreply, State};
 handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, State) ->
     % check if Pid still exists
-    case ets:lookup(?BY_PID, Pid) of
+    case ets:lookup(State#st.by_pid, Pid) of
         [{Pid, {DbName, Sig}}] when is_pid(Pid) ->
-            ets:insert(?BY_DB, {DbName, {DDocId, Sig}});
+            ets:insert(State#st.by_db, {DbName, {DDocId, Sig}});
         _ ->
             ok
     end,
     {noreply, State};
 handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) ->
-    ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}),
+    ets:delete_object(State#st.by_db, {DbName, {DDocId, Sig}}),
     {noreply, State}.
 
 handle_info({'EXIT', Pid, Reason}, Server) ->
-    case ets:lookup(?BY_PID, Pid) of
+    case ets:lookup(Server#st.by_pid, Pid) of
         [{Pid, {DbName, Sig}}] ->
             DDocIds = [
                 DDocId
              || {_, {DDocId, _}} <-
-                    ets:match_object(?BY_DB, {DbName, {'$1', Sig}})
+                    ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
             ],
-            rem_from_ets(DbName, Sig, DDocIds, Pid);
+            rem_from_ets(DbName, Sig, DDocIds, Pid, Server);
         [] when Reason /= normal ->
             exit(Reason);
         _Else ->
@@ -201,23 +215,23 @@ handle_info(Msg, State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-handle_config_change("couchdb", "index_dir", RootDir, _, RootDir) ->
-    {ok, RootDir};
-handle_config_change("couchdb", "view_index_dir", RootDir, _, RootDir) ->
-    {ok, RootDir};
-handle_config_change("couchdb", "index_dir", _, _, _) ->
-    exit(whereis(couch_index_server), config_change),
+handle_config_change("couchdb", "index_dir", RootDir, _, #st{root_dir = RootDir} = St) ->
+    {ok, St};
+handle_config_change("couchdb", "view_index_dir", RootDir, _, #st{root_dir = RootDir} = St) ->
+    {ok, St};
+handle_config_change("couchdb", "index_dir", _, _, St) ->
+    exit(whereis(St#st.index_server), config_change),
     remove_handler;
-handle_config_change("couchdb", "view_index_dir", _, _, _) ->
-    exit(whereis(couch_index_server), config_change),
+handle_config_change("couchdb", "view_index_dir", _, _, St) ->
+    exit(whereis(St#st.index_server), config_change),
     remove_handler;
-handle_config_change(_, _, _, _, RootDir) ->
-    {ok, RootDir}.
+handle_config_change(_, _, _, _, St) ->
+    {ok, St}.
 
 handle_config_terminate(_, stop, _) ->
     ok;
-handle_config_terminate(_Server, _Reason, _State) ->
-    erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener),
+handle_config_terminate(_Server, _Reason, State) ->
+    erlang:send_after(?RELISTEN_DELAY, whereis(State#st.index_server), restart_config_listener),
     {ok, couch_index_util:root_dir()}.
 
 new_index({Mod, IdxState, DbName, Sig}) ->
@@ -225,26 +239,26 @@ new_index({Mod, IdxState, DbName, Sig}) ->
     case couch_index:start_link({Mod, IdxState}) of
         {ok, Pid} ->
             ok = gen_server:call(
-                ?MODULE, {async_open, {DbName, DDocId, Sig}, {ok, Pid}}
+                couch_index_server(DbName), {async_open, {DbName, DDocId, Sig}, {ok, Pid}}
             ),
             unlink(Pid);
         Error ->
             ok = gen_server:call(
-                ?MODULE, {async_error, {DbName, DDocId, Sig}, Error}
+                couch_index_server(DbName), {async_error, {DbName, DDocId, Sig}, Error}
             )
     end.
 
-reset_indexes(DbName, Root) ->
+reset_indexes(DbName, #st{} = State) ->
     % shutdown all the updaters and clear the files, the db got changed
     SigDDocIds = lists:foldl(
         fun({_, {DDocId, Sig}}, DDict) ->
             dict:append(Sig, DDocId, DDict)
         end,
         dict:new(),
-        ets:lookup(?BY_DB, DbName)
+        ets:lookup(State#st.by_db, DbName)
     ),
     Fun = fun({Sig, DDocIds}) ->
-        [{_, Pid}] = ets:lookup(?BY_SIG, {DbName, Sig}),
+        [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
         unlink(Pid),
         gen_server:cast(Pid, delete),
         receive
@@ -253,32 +267,32 @@ reset_indexes(DbName, Root) ->
         after 0 ->
             ok
         end,
-        rem_from_ets(DbName, Sig, DDocIds, Pid)
+        rem_from_ets(DbName, Sig, DDocIds, Pid, State)
     end,
     lists:foreach(Fun, dict:to_list(SigDDocIds)),
     Path = couch_index_util:index_dir("", DbName),
-    couch_file:nuke_dir(Root, Path).
+    couch_file:nuke_dir(State#st.root_dir, Path).
 
-add_to_ets(DbName, Sig, DDocId, Pid) ->
-    ets:insert(?BY_SIG, {{DbName, Sig}, Pid}),
-    ets:insert(?BY_PID, {Pid, {DbName, Sig}}),
-    ets:insert(?BY_DB, {DbName, {DDocId, Sig}}).
+add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) ->
+    ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}),
+    ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}),
+    ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}).
 
-rem_from_ets(DbName, Sig, DDocIds, Pid) ->
-    ets:delete(?BY_SIG, {DbName, Sig}),
-    ets:delete(?BY_PID, Pid),
+rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
+    ets:delete(St#st.by_sig, {DbName, Sig}),
+    ets:delete(St#st.by_pid, Pid),
     lists:foreach(
         fun(DDocId) ->
-            ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}})
+            ets:delete_object(St#st.by_db, {DbName, {DDocId, Sig}})
         end,
         DDocIds
     ).
 
 handle_db_event(DbName, created, St) ->
-    gen_server:cast(?MODULE, {reset_indexes, DbName}),
+    gen_server:cast(St#st.index_server, {reset_indexes, DbName}),
     {ok, St};
 handle_db_event(DbName, deleted, St) ->
-    gen_server:cast(?MODULE, {reset_indexes, DbName}),
+    gen_server:cast(St#st.index_server, {reset_indexes, DbName}),
     {ok, St};
 handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
     DDocResult = couch_util:with_db(DbName, fun(Db) ->
@@ -297,15 +311,15 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
             lists:foreach(
                 fun({_DbShard, {_DDocId, Sig}}) ->
                     % check if there are other ddocs with the same Sig for the same db
-                    SigDDocs = ets:match_object(?BY_DB, {DbShard, {'$1', Sig}}),
+                    SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}),
                     if
                         length(SigDDocs) > 1 ->
-                            % remove records from ?BY_DB for this DDoc
+                            % remove records from by_db for this DDoc
                             Args = [DbShard, DDocId, Sig],
-                            gen_server:cast(?MODULE, {rem_from_ets, Args});
+                            gen_server:cast(St#st.index_server, {rem_from_ets, Args});
                         true ->
                             % single DDoc with this Sig - close couch_index processes
-                            case ets:lookup(?BY_SIG, {DbShard, Sig}) of
+                            case ets:lookup(St#st.by_sig, {DbShard, Sig}) of
                                 [{_, IndexPid}] ->
                                     (catch gen_server:cast(IndexPid, {ddoc_updated, DDocResult}));
                                 [] ->
@@ -313,7 +327,7 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
                             end
                     end
                 end,
-                ets:match_object(?BY_DB, {DbShard, {DDocId, '$1'}})
+                ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
             )
         end,
         DbShards
@@ -322,15 +336,48 @@ handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) ->
 handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
     lists:foreach(
         fun({_DbName, {_DDocId, Sig}}) ->
-            case ets:lookup(?BY_SIG, {DbName, Sig}) of
+            case ets:lookup(St#st.by_sig, {DbName, Sig}) of
                 [{_, IndexPid}] ->
                     (catch gen_server:cast(IndexPid, ddoc_updated));
                 [] ->
                     ok
             end
         end,
-        ets:match_object(?BY_DB, {DbName, {DDocId, '$1'}})
+        ets:match_object(St#st.by_db, {DbName, {DDocId, '$1'}})
     ),
     {ok, St};
 handle_db_event(_DbName, _Event, St) ->
     {ok, St}.
+
+
+num_servers() ->
+    erlang:system_info(schedulers).
+
+couch_index_server(Arg) ->
+    name("index_server", Arg).
+
+by_sig(Arg) ->
+    name("couchdb_indexes_by_sig", Arg).
+
+by_pid(Arg) ->
+    name("couchdb_indexes_by_pid", Arg).
+
+by_db(Arg) ->
+    name("couchdb_indexes_by_db", Arg).
+
+name(BaseName, Sig) when is_list(Sig) ->
+    name(BaseName, ?l2b(Sig));
+name(BaseName, Sig) when is_binary(Sig) ->
+    N = 1 + erlang:phash2(Sig, num_servers()),
+    name(BaseName, N);
+name(BaseName, N) when is_integer(N), N > 0 ->
+    list_to_atom(BaseName ++ "_" ++ integer_to_list(N)).
+
+aggregate_queue_len() ->
+    N = num_servers(),
+    Names = [couch_index_server(I) || I <- lists:seq(1, N)],
+    MQs = [
+        process_info(whereis(Name), message_queue_len)
+     || Name <- Names
+    ],
+    lists:sum([X || {_, X} <- MQs]).
diff --git a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
index 7bee8ba..ea0b97e 100644
--- a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
+++ b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
@@ -15,6 +15,7 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+
 start() ->
     fake_index(),
     Ctx = test_util:start_couch([mem3, fabric]),
@@ -22,68 +23,56 @@ start() ->
     ok = fabric:create_db(DbName, [?ADMIN_CTX]),
     {Ctx, DbName}.
 
+
 stop({Ctx, DbName}) ->
     meck:unload(test_index),
     ok = fabric:delete_db(DbName, [?ADMIN_CTX]),
     DbDir = config:get("couchdb", "database_dir", "."),
     WaitFun = fun() ->
-        filelib:fold_files(
-            DbDir,
-            <<".*", DbName/binary, "\.[0-9]+.*">>,
-            true,
-            fun(_F, _A) -> wait end,
-            ok
-        )
+        filelib:fold_files(DbDir, <<".*", DbName/binary, "\.[0-9]+.*">>,
+            true, fun(_F, _A) -> wait end, ok)
     end,
     ok = test_util:wait(WaitFun),
     test_util:stop_couch(Ctx),
     ok.
 
+
 ddoc_update_test_() ->
     {
         "Check ddoc update actions",
         {
             setup,
-            fun start/0,
-            fun stop/1,
+            fun start/0, fun stop/1,
             fun check_all_indexers_exit_on_ddoc_change/1
         }
     }.
 
+
 check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
     ?_test(begin
-        [DbShard1 | RestDbShards] = lists:map(
-            fun(Sh) ->
-                {ok, ShardDb} = couch_db:open(mem3:name(Sh), []),
-                ShardDb
-            end,
-            mem3:local_shards(mem3:dbname(DbName))
-        ),
+        [DbShard1 | RestDbShards] = lists:map(fun(Sh) ->
+           {ok, ShardDb} = couch_db:open(mem3:name(Sh), []),
+            ShardDb
+        end, mem3:local_shards(mem3:dbname(DbName))),
 
         % create a DDoc on Db1
         DDocID = <<"idx_name">>,
-        DDocJson = couch_doc:from_json_obj(
-            {[
-                {<<"_id">>, DDocID},
-                {<<"value">>, 1}
-            ]}
-        ),
+        DDocJson = couch_doc:from_json_obj({[
+           {<<"_id">>, DDocID},
+           {<<"value">>, 1}
+        ]}),
         {ok, _Rev} = couch_db:update_doc(DbShard1, DDocJson, []),
         {ok, DbShard} = couch_db:reopen(DbShard1),
         {ok, DDoc} = couch_db:open_doc(
-            DbShard, DDocID, [ejson_body, ?ADMIN_CTX]
-        ),
+            DbShard, DDocID, [ejson_body, ?ADMIN_CTX]),
         DbShards = [DbShard | RestDbShards],
         N = length(DbShards),
 
         % run couch_index process for each shard database
         ok = meck:reset(test_index),
-        lists:foreach(
-            fun(ShardDb) ->
-                couch_index_server:get_index(test_index, ShardDb, DDoc)
-            end,
-            DbShards
-        ),
+        lists:foreach(fun(ShardDb) ->
+            couch_index_server:get_index(test_index, ShardDb, DDoc)
+        end, DbShards),
 
         IndexesBefore = get_indexes_by_ddoc(DDocID, N),
         ?assertEqual(N, length(IndexesBefore)),
@@ -92,20 +81,24 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
         ?assertEqual(N, length(AliveBefore)),
 
         % update ddoc
-        DDocJson2 = couch_doc:from_json_obj(
-            {[
-                {<<"_id">>, DDocID},
-                {<<"value">>, 2},
-                {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)}
-            ]}
-        ),
+        DDocJson2 = couch_doc:from_json_obj({[
+            {<<"_id">>, DDocID},
+            {<<"value">>, 2},
+            {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)}
+        ]}),
         {ok, _} = couch_db:update_doc(DbShard, DDocJson2, []),
 
         % assert that all index processes exit after ddoc updated
         ok = meck:reset(test_index),
-        couch_index_server:handle_db_event(
-            couch_db:name(DbShard), {ddoc_updated, DDocID}, {st, ""}
-        ),
+        lists:foreach(fun(I) ->
+            couch_index_server:handle_db_event(
+                couch_db:name(DbShard), {ddoc_updated, DDocID},
+                {st, "",
+                couch_index_server:couch_index_server(I),
+                couch_index_server:by_sig(I),
+                couch_index_server:by_pid(I),
+                couch_index_server:by_db(I)})
+        end, seq()),
 
         ok = meck:wait(N, test_index, init, ['_', '_'], 5000),
         IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
@@ -117,6 +110,7 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
         ok
     end).
 
+
 fake_index() ->
     ok = meck:new([test_index], [non_strict]),
     ok = meck:expect(test_index, init, fun(Db, DDoc) ->
@@ -137,11 +131,13 @@ fake_index() ->
     end),
     ok = meck:expect(test_index, shutdown, ['_'], ok).
 
+
 get_indexes_by_ddoc(DDocID, N) ->
     Indexes = test_util:wait(fun() ->
-        Indxs = ets:match_object(
-            couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}
-        ),
+        Indxs = lists:flatmap(fun(I) ->
+            ets:match_object(
+            couch_index_server:by_db(I), {'$1', {DDocID, '$2'}})
+        end, seq()),
         case length(Indxs) == N of
             true ->
                 Indxs;
@@ -149,13 +145,12 @@ get_indexes_by_ddoc(DDocID, N) ->
                 wait
         end
     end),
-    lists:foldl(
-        fun({DbName, {_DDocID, Sig}}, Acc) ->
-            case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of
-                [{_, Pid}] -> [Pid | Acc];
-                _ -> Acc
-            end
-        end,
-        [],
-        Indexes
-    ).
+    lists:foldl(fun({DbName, {_DDocID, Sig}}, Acc) ->
+        case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of
+            [{_, Pid}] -> [Pid|Acc];
+            _ -> Acc
+        end
+    end, [], Indexes).
+
+seq() ->
+    lists:seq(1, couch_index_server:num_servers()).
diff --git a/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl b/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
index 2a62994..36b0841 100644
--- a/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
+++ b/src/couch_mrview/test/eunit/couch_mrview_ddoc_updated_tests.erl
@@ -88,7 +88,7 @@ check_indexing_stops_on_ddoc_change(Db) ->
     ?_test(begin
         DDocID = <<"_design/bar">>,
 
-        IndexesBefore = get_indexes_by_ddoc(DDocID, 1),
+        IndexesBefore = get_indexes_by_ddoc(Db#db.name, DDocID, 1),
         ?assertEqual(1, length(IndexesBefore)),
         AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
         ?assertEqual(1, length(AliveBefore)),
@@ -127,16 +127,16 @@ check_indexing_stops_on_ddoc_change(Db) ->
         end,
 
         %% assert that previously running indexes are gone
-        IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
+        IndexesAfter = get_indexes_by_ddoc(Db#db.name, DDocID, 0),
         ?assertEqual(0, length(IndexesAfter)),
         AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndexesBefore),
         ?assertEqual(0, length(AliveAfter))
     end).
 
-get_indexes_by_ddoc(DDocID, N) ->
+get_indexes_by_ddoc(DbName0, DDocID, N) ->
     Indexes = test_util:wait(fun() ->
         Indxs = ets:match_object(
-            couchdb_indexes_by_db, {'$1', {DDocID, '$2'}}
+            couch_index_server:by_db(DbName0), {'$1', {DDocID, '$2'}}
         ),
         case length(Indxs) == N of
             true ->
@@ -147,7 +147,7 @@ get_indexes_by_ddoc(DDocID, N) ->
     end),
     lists:foldl(
         fun({DbName, {_DDocID, Sig}}, Acc) ->
-            case ets:lookup(couchdb_indexes_by_sig, {DbName, Sig}) of
+            case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of
                 [{_, Pid}] -> [Pid | Acc];
                 _ -> Acc
             end