You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2021/02/09 17:15:46 UTC

[couchdb] branch couch_server_sharding created (now b3265de)

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a change to branch couch_server_sharding
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at b3265de  ick

This branch includes the following new commits:

     new 28b07e1  encapsulate db_updated call in a function
     new 2896a7a  supervise multiple couch_servers
     new b3265de  ick

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/03: encapsulate db_updated call in a function

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch couch_server_sharding
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 28b07e10b00a86e9011eacf2f68eeaf882e7e5ad
Author: Robert Newson <rn...@apache.org>
AuthorDate: Tue Feb 9 14:11:15 2021 +0000

    encapsulate db_updated call in a function
---
 src/couch/src/couch_db_engine.erl  |  2 +-
 src/couch/src/couch_db_updater.erl | 16 ++++++++--------
 src/couch/src/couch_server.erl     |  5 +++++
 3 files changed, 14 insertions(+), 9 deletions(-)

diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 9adc992..918dabc 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -1073,7 +1073,7 @@ finish_compaction(Db, CompactInfo) ->
                 compactor_pid = CompactorPid
             }
     end,
-    ok = gen_server:call(couch_server, {db_updated, NewDb}, infinity),
+    ok = couch_server:db_updated(NewDb),
     {ok, NewDb}.
 
 
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 1ca804c..535acfa 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -79,7 +79,7 @@ handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
     exit(Pid, kill),
     couch_server:delete_compaction_files(Db#db.name),
     Db2 = Db#db{compactor_pid = nil},
-    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+    ok = couch_server:db_updated(Db2),
     {reply, ok, Db2, idle_limit()};
 
 handle_call({set_security, NewSec}, _From, #db{} = Db) ->
@@ -87,18 +87,18 @@ handle_call({set_security, NewSec}, _From, #db{} = Db) ->
     NewSecDb = commit_data(NewDb#db{
         security = NewSec
     }),
-    ok = gen_server:call(couch_server, {db_updated, NewSecDb}, infinity),
+    ok = couch_server:db_updated(NewSecDb),
     {reply, ok, NewSecDb, idle_limit()};
 
 handle_call({set_revs_limit, Limit}, _From, Db) ->
     {ok, Db2} = couch_db_engine:set_revs_limit(Db, Limit),
     Db3 = commit_data(Db2),
-    ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
+    ok = couch_server:db_updated(Db3),
     {reply, ok, Db3, idle_limit()};
 
 handle_call({set_purge_infos_limit, Limit}, _From, Db) ->
     {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit),
-    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+    ok = couch_server:db_updated(Db2),
     {reply, ok, Db2, idle_limit()};
 
 handle_call({purge_docs, [], _}, _From, Db) ->
@@ -130,7 +130,7 @@ handle_call(Msg, From, Db) ->
 
 handle_cast({load_validation_funs, ValidationFuns}, Db) ->
     Db2 = Db#db{validate_doc_funs = ValidationFuns},
-    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+    ok = couch_server:db_updated(Db2),
     {noreply, Db2, idle_limit()};
 handle_cast(start_compact, Db) ->
     case Db#db.compactor_pid of
@@ -143,7 +143,7 @@ handle_cast(start_compact, Db) ->
             Args = [Db#db.name, UpdateSeq],
             couch_log:info("Starting compaction for db \"~s\" at ~p", Args),
             {ok, Db2} = couch_db_engine:start_compaction(Db),
-            ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+            ok = couch_server:db_updated(Db2),
             {noreply, Db2, idle_limit()};
         _ ->
             % compact currently running, this is a no-op
@@ -175,7 +175,7 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts},
     NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
     try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts) of
     {ok, Db2, UpdatedDDocIds} ->
-        ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+        ok = couch_server:db_updated(Db2),
         case {couch_db:get_update_seq(Db), couch_db:get_update_seq(Db2)} of
             {Seq, Seq} -> ok;
             _ -> couch_event:notify(Db2#db.name, updated)
@@ -780,7 +780,7 @@ purge_docs(Db, PurgeReqs) ->
 
     {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos),
     Db2 = commit_data(Db1),
-    ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+    ok = couch_server:db_updated(Db2),
     couch_event:notify(Db2#db.name, updated),
     {ok, Db2, Replies}.
 
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 6db3f74..f57bf3b 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -27,6 +27,7 @@
 -export([get_engine_extensions/0]).
 -export([get_engine_path/2]).
 -export([lock/2, unlock/1]).
+-export([db_updated/1]).
 
 % config_listener api
 -export([handle_config_change/5, handle_config_terminate/3]).
@@ -873,6 +874,10 @@ unlock(DbName) when is_binary(DbName) ->
     ok.
 
 
+db_updated(Db) ->
+    gen_server:call(couch_server, {db_updated, Db}, infinity).
+
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 


[couchdb] 03/03: ick

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch couch_server_sharding
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b3265de5b6576ff84099e242e6979ef29e7be69d
Author: Robert Newson <rn...@apache.org>
AuthorDate: Tue Feb 9 17:13:55 2021 +0000

    ick
---
 src/couch/src/couch_lru.erl         |  13 +++--
 src/couch/src/couch_primary_sup.erl |  20 ++-----
 src/couch/src/couch_server.erl      | 111 ++++++++++++++++++++----------------
 3 files changed, 74 insertions(+), 70 deletions(-)

diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl
index 6ad7c65..786e80f 100644
--- a/src/couch/src/couch_lru.erl
+++ b/src/couch/src/couch_lru.erl
@@ -43,17 +43,20 @@ close({Tree, _} = Cache) ->
 close_int(none, _) ->
     false;
 close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) ->
-    case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
+    CouchDbs = couch_server:name("couch_server", couch_server:num_for_db(DbName)),
+    CouchDbsPidToName = couch_server:name("couch_dbs_pid_to_name", couch_server:num_for_db(DbName)),
+
+    case ets:update_element(CouchDbs, DbName, {#entry.lock, locked}) of
     true ->
-        [#entry{db = Db, pid = Pid}] = ets:lookup(couch_dbs, DbName),
+        [#entry{db = Db, pid = Pid}] = ets:lookup(CouchDbs, DbName),
         case couch_db:is_idle(Db) of true ->
-            true = ets:delete(couch_dbs, DbName),
-            true = ets:delete(couch_dbs_pid_to_name, Pid),
+            true = ets:delete(CouchDbs, DbName),
+            true = ets:delete(CouchDbsPidToName, Pid),
             exit(Pid, kill),
             {true, {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)}};
         false ->
             ElemSpec = {#entry.lock, unlocked},
-            true = ets:update_element(couch_dbs, DbName, ElemSpec),
+            true = ets:update_element(CouchDbs, DbName, ElemSpec),
             couch_stats:increment_counter([couchdb, couch_server, lru_skip]),
             close_int(gb_trees:next(Iter), update(DbName, Cache))
         end;
diff --git a/src/couch/src/couch_primary_sup.erl b/src/couch/src/couch_primary_sup.erl
index 363e850..19a8c25 100644
--- a/src/couch/src/couch_primary_sup.erl
+++ b/src/couch/src/couch_primary_sup.erl
@@ -36,26 +36,14 @@ init([]) ->
 
 
 couch_servers() ->
-    case application:get_env(couch, concurrency, 10) of
-        undefined ->
-            [couch_server(undefined)];
-        N when is_integer(N), N > 1 ->
-            [couch_server(I) || I <- lists:seq(1, N)]
-    end.
-
+    N = application:get_env(couch, concurrency, 1),
+    [couch_server(I) || I <- lists:seq(1, N)].
 
 couch_server(Num) ->
-    Name = couch_server_name(Num),
+    Name = couch_server:name("couch_server", Num),
     {Name,
-     {couch_server, sup_start_link, [Name]},
+     {couch_server, sup_start_link, [Num]},
      permanent,
      brutal_kill,
      worker,
      [couch_server]}.
-
-
-couch_server_name(undefined) ->
-    couch_server;
-
-couch_server_name(N) when is_integer(N), N > 0 ->
-    list_to_atom("couch_server_" ++ integer_to_list(N)).
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 88052b1..a706e08 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -28,6 +28,7 @@
 -export([get_engine_path/2]).
 -export([lock/2, unlock/1]).
 -export([db_updated/1]).
+-export([name/2, num_for_db/1]).
 
 % config_listener api
 -export([handle_config_change/5, handle_config_terminate/3]).
@@ -45,7 +46,8 @@
     dbs_open=0,
     start_time="",
     update_lru_on_read=true,
-    lru = couch_lru:new()
+    lru = couch_lru:new(),
+    num
     }).
 
 dev_start() ->
@@ -76,8 +78,8 @@ get_stats() ->
             gen_server:call(couch_server, get_server),
     [{start_time, ?l2b(Time)}, {dbs_open, Open}].
 
-sup_start_link(Name) ->
-    gen_server:start_link({local, Name}, couch_server, [], []).
+sup_start_link(Num) ->
+    gen_server:start_link({local, name("couch_server", Num)}, couch_server, [Num], []).
 
 open(DbName, Options) ->
     try
@@ -89,7 +91,7 @@ open(DbName, Options) ->
 
 open_int(DbName, Options0) ->
     Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", num_for_db(DbName)), DbName) of
     [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
         update_lru(DbName, Entry#entry.db_options),
         {ok, Db1} = couch_db:incref(Db0),
@@ -98,7 +100,7 @@ open_int(DbName, Options0) ->
         Options = maybe_add_sys_db_callbacks(DbName, Options0),
         Timeout = couch_util:get_value(timeout, Options, infinity),
         Create = couch_util:get_value(create_if_missing, Options, false),
-        case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of
+        case gen_server:call(name("couch_server", num_for_db(DbName)), {open, DbName, Options}, Timeout) of
         {ok, Db0} ->
             {ok, Db1} = couch_db:incref(Db0),
             couch_db:set_user_ctx(Db1, Ctx);
@@ -114,7 +116,7 @@ update_lru(DbName, Options) ->
     case config:get_boolean("couchdb", "update_lru_on_read", false) of
         true ->
             case lists:member(sys_db, Options) of
-                false -> gen_server:cast(couch_server, {update_lru, DbName});
+                false -> gen_server:cast(name("couch_server", num_for_db(DbName)), {update_lru, DbName});
                 true -> ok
             end;
         false ->
@@ -135,7 +137,7 @@ create(DbName, Options) ->
 create_int(DbName, Options0) ->
     Options = maybe_add_sys_db_callbacks(DbName, Options0),
     couch_partition:validate_dbname(DbName, Options),
-    case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
+    case gen_server:call(name("couch_server", num_for_db(DbName)), {create, DbName, Options}, infinity) of
     {ok, Db0} ->
         Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
         {ok, Db1} = couch_db:incref(Db0),
@@ -145,7 +147,7 @@ create_int(DbName, Options0) ->
     end.
 
 delete(DbName, Options) ->
-    gen_server:call(couch_server, {delete, DbName, Options}, infinity).
+    gen_server:call(name("couch_server", num_for_db(DbName)), {delete, DbName, Options}, infinity).
 
 
 exists(DbName) ->
@@ -228,13 +230,13 @@ hash_admin_passwords(Persist) ->
 close_db_if_idle(DbName) ->
     case ets:lookup(couch_dbs, DbName) of
         [#entry{}] ->
-            gen_server:cast(couch_server, {close_db_if_idle, DbName});
+            gen_server:cast(name("couch_server", num_for_db(DbName)), {close_db_if_idle, DbName});
         [] ->
             ok
     end.
 
 
-init([]) ->
+init([Num]) ->
     couch_util:set_mqd_off_heap(?MODULE),
     couch_util:set_process_priority(?MODULE, high),
 
@@ -271,15 +273,15 @@ init([]) ->
     ok = config:listen_for_changes(?MODULE, nil),
     ok = couch_file:init_delete_dir(RootDir),
     hash_admin_passwords(),
-    ets:new(couch_dbs, [
+    ets:new(name("couch_dbs", Num), [
         set,
         protected,
         named_table,
         {keypos, #entry.name},
         {read_concurrency, true}
     ]),
-    ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
-    ets:new(couch_dbs_locks, [
+    ets:new(name("couch_dbs_pid_to_name", Num), [set, protected, named_table]),
+    ets:new(name("couch_dbs_locks", Num), [
         set,
         public,
         named_table,
@@ -290,7 +292,8 @@ init([]) ->
                 engines = Engines,
                 max_dbs_open=MaxDbsOpen,
                 update_lru_on_read=UpdateLruOnRead,
-                start_time=couch_util:rfc1123_date()}}.
+                start_time=couch_util:rfc1123_date(),
+                num=Num}}.
 
 terminate(Reason, Srv) ->
     couch_log:error("couch_server terminating with ~p, state ~2048p",
@@ -302,7 +305,7 @@ terminate(Reason, Srv) ->
         if Db == undefined -> ok; true ->
             couch_util:shutdown_sync(couch_db:get_pid(Db))
         end
-    end, nil, couch_dbs),
+    end, nil, name("couch_dbs", Srv#server.num)),
     ok.
 
 handle_config_change("couchdb", "database_dir", _, _, _) ->
@@ -425,7 +428,7 @@ open_async(Server, From, DbName, Options) ->
         true -> create;
         false -> open
     end,
-    true = ets:insert(couch_dbs, #entry{
+    true = ets:insert(name("couch_dbs", Server#server.num), #entry{
         name = DbName,
         pid = Opener,
         lock = locked,
@@ -433,7 +436,7 @@ open_async(Server, From, DbName, Options) ->
         req_type = ReqType,
         db_options = Options
     }),
-    true = ets:insert(couch_dbs_pid_to_name, {Opener, DbName}),
+    true = ets:insert(name("couch_dbs_pid_to_name", Server#server.num), {Opener, DbName}),
     db_opened(Server, Options).
 
 open_async_int(Server, DbName, Options) ->
@@ -468,9 +471,9 @@ handle_call(reload_engines, _From, Server) ->
 handle_call(get_server, _From, Server) ->
     {reply, {ok, Server}, Server};
 handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
-    true = ets:delete(couch_dbs_pid_to_name, Opener),
+    true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Opener),
     DbPid = couch_db:get_pid(Db),
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
         [] ->
             % db was deleted during async open
             exit(DbPid, kill),
@@ -485,7 +488,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
                 _ ->
                     ok
             end,
-            true = ets:insert(couch_dbs, #entry{
+            true = ets:insert(name("couch_dbs", Server#server.num), #entry{
                 name = DbName,
                 db = Db,
                 pid = DbPid,
@@ -493,7 +496,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
                 db_options = Entry#entry.db_options,
                 start_time = couch_db:get_instance_start_time(Db)
             }),
-            true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}),
+            true = ets:insert(name("couch_dbs_pid_to_name", Server#server.num), {DbPid, DbName}),
             Lru = case couch_db:is_system_db(Db) of
                 false ->
                     couch_lru:insert(DbName, Server#server.lru);
@@ -511,14 +514,14 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
 handle_call({open_result, DbName, {error, eexist}}, From, Server) ->
     handle_call({open_result, DbName, file_exists}, From, Server);
 handle_call({open_result, DbName, Error}, {Opener, _}, Server) ->
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
         [] ->
             % db was deleted during async open
             {reply, ok, Server};
         [#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] ->
             [gen_server:reply(Waiter, Error) || Waiter <- Waiters],
-            true = ets:delete(couch_dbs, DbName),
-            true = ets:delete(couch_dbs_pid_to_name, Opener),
+            true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+            true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Opener),
             NewServer = case ReqType of
                 {create, DbName, Options, CrFrom} ->
                     open_async(Server, CrFrom, DbName, Options);
@@ -532,7 +535,7 @@ handle_call({open_result, DbName, Error}, {Opener, _}, Server) ->
             {reply, ok, Server}
     end;
 handle_call({open, DbName, Options}, From, Server) ->
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
     [] ->
         case make_room(Server, Options) of
         {ok, Server2} ->
@@ -541,7 +544,7 @@ handle_call({open, DbName, Options}, From, Server) ->
             {reply, CloseError, Server}
         end;
     [#entry{waiters = Waiters} = Entry] when is_list(Waiters) ->
-        true = ets:insert(couch_dbs, Entry#entry{waiters = [From | Waiters]}),
+        true = ets:insert(name("couch_dbs", Server#server.num), Entry#entry{waiters = [From | Waiters]}),
         NumWaiters = length(Waiters),
         if NumWaiters =< 10 orelse NumWaiters rem 10 /= 0 -> ok; true ->
             Fmt = "~b clients waiting to open db ~s",
@@ -552,7 +555,7 @@ handle_call({open, DbName, Options}, From, Server) ->
         {reply, {ok, Db}, Server}
     end;
 handle_call({create, DbName, Options}, From, Server) ->
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
     [] ->
         case make_room(Server, Options) of
         {ok, Server2} ->
@@ -567,7 +570,7 @@ handle_call({create, DbName, Options}, From, Server) ->
         % to wait while we figure out if it'll succeed.
         CrOptions = [create | Options],
         Req = {create, DbName, CrOptions, From},
-        true = ets:insert(couch_dbs, Entry#entry{req_type = Req}),
+        true = ets:insert(name("couch_dbs", Server#server.num), Entry#entry{req_type = Req}),
         {noreply, Server};
     [_AlreadyRunningDb] ->
         {reply, file_exists, Server}
@@ -577,17 +580,17 @@ handle_call({delete, DbName, Options}, _From, Server) ->
     case check_dbname(DbNameList) of
     ok ->
         Server2 =
-        case ets:lookup(couch_dbs, DbName) of
+        case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
         [] -> Server;
         [#entry{pid = Pid, waiters = Waiters} = Entry] when is_list(Waiters) ->
-            true = ets:delete(couch_dbs, DbName),
-            true = ets:delete(couch_dbs_pid_to_name, Pid),
+            true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+            true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
             exit(Pid, kill),
             [gen_server:reply(Waiter, not_found) || Waiter <- Waiters],
             db_closed(Server, Entry#entry.db_options);
         [#entry{pid = Pid} = Entry] ->
-            true = ets:delete(couch_dbs, DbName),
-            true = ets:delete(couch_dbs_pid_to_name, Pid),
+            true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+            true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
             exit(Pid, kill),
             db_closed(Server, Entry#entry.db_options)
         end,
@@ -616,9 +619,9 @@ handle_call({delete, DbName, Options}, _From, Server) ->
 handle_call({db_updated, Db}, _From, Server0) ->
     DbName = couch_db:name(Db),
     StartTime = couch_db:get_instance_start_time(Db),
-    Server = try ets:lookup_element(couch_dbs, DbName, #entry.start_time) of
+    Server = try ets:lookup_element(name("couch_dbs", Server0#server.num), DbName, #entry.start_time) of
         StartTime ->
-            true = ets:update_element(couch_dbs, DbName, {#entry.db, Db}),
+            true = ets:update_element(name("couch_dbs", Server0#server.num), DbName, {#entry.db, Db}),
             Lru = case couch_db:is_system_db(Db) of
                 false -> couch_lru:update(DbName, Server0#server.lru);
                 true -> Server0#server.lru
@@ -636,19 +639,19 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} =
 handle_cast({update_lru, _DbName}, Server) ->
     {noreply, Server};
 handle_cast({close_db_if_idle, DbName}, Server) ->
-    case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
+    case ets:update_element(name("couch_dbs", Server#server.num), DbName, {#entry.lock, locked}) of
     true ->
-        [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs, DbName),
+        [#entry{db = Db, db_options = DbOpts}] = ets:lookup(name("couch_dbs", Server#server.num), DbName),
         case couch_db:is_idle(Db) of
         true ->
             DbPid = couch_db:get_pid(Db),
-            true = ets:delete(couch_dbs, DbName),
-            true = ets:delete(couch_dbs_pid_to_name, DbPid),
+            true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+            true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), DbPid),
             exit(DbPid, kill),
             {noreply, db_closed(Server, DbOpts)};
         false ->
             true = ets:update_element(
-                     couch_dbs, DbName, {#entry.lock, unlocked}),
+                name("couch_dbs", Server#server.num), DbName, {#entry.lock, unlocked}),
             {noreply, Server}
         end;
     false ->
@@ -664,9 +667,9 @@ code_change(_OldVsn, #server{}=State, _Extra) ->
 handle_info({'EXIT', _Pid, config_change}, Server) ->
     {stop, config_change, Server};
 handle_info({'EXIT', Pid, Reason}, Server) ->
-    case ets:lookup(couch_dbs_pid_to_name, Pid) of
+    case ets:lookup(name("couch_dbs_pid_to_name", Server#server.num), Pid) of
     [{Pid, DbName}] ->
-        [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs, DbName),
+        [#entry{waiters = Waiters} = Entry] = ets:lookup(name("couch_dbs", Server#server.num), DbName),
         if Reason /= snappy_nif_not_loaded -> ok; true ->
             Msg = io_lib:format("To open the database `~s`, Apache CouchDB "
                 "must be built with Erlang OTP R13B04 or higher.", [DbName]),
@@ -681,8 +684,8 @@ handle_info({'EXIT', Pid, Reason}, Server) ->
         if not is_list(Waiters) -> ok; true ->
             [gen_server:reply(Waiter, Reason) || Waiter <- Waiters]
         end,
-        true = ets:delete(couch_dbs, DbName),
-        true = ets:delete(couch_dbs_pid_to_name, Pid),
+        true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+        true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
         {noreply, db_closed(Server, Entry#entry.db_options)};
     [] ->
         {noreply, Server}
@@ -720,7 +723,7 @@ validate_open_or_create(DbName, Options) ->
             throw({?MODULE, EngineError})
     end,
 
-    case ets:lookup(couch_dbs_locks, DbName) of
+    case ets:lookup(name("couch_dbs_locks", num_for_db(DbName)), DbName) of
         [] ->
             ok;
         [{DbName, Reason}] ->
@@ -861,21 +864,31 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) ->
     end.
 
 lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) ->
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", num_for_db(DbName)), DbName) of
         [] ->
-            true = ets:insert(couch_dbs_locks, {DbName, Reason}),
+            true = ets:insert(name("couch_dbs_locks", num_for_db(DbName)), {DbName, Reason}),
             ok;
         [#entry{}] ->
             {error, already_opened}
     end.
 
 unlock(DbName) when is_binary(DbName) ->
-    true = ets:delete(couch_dbs_locks, DbName),
+    true = ets:delete(name("couch_dbs_locks", num_for_db(DbName)), DbName),
     ok.
 
 
 db_updated(Db) ->
-    gen_server:call(couch_server, {db_updated, Db}, infinity).
+    DbName = couch_db:name(Db),
+    gen_server:call(name("couch_server", num_for_db(DbName)), {db_updated, Db}, infinity).
+
+
+name(Name, N) when is_integer(N), N >= 0 ->
+    list_to_atom(Name ++ "_" ++ integer_to_list(N)).
+
+
+num_for_db(DbName) when is_binary(DbName) ->
+    N = application:get_env(couch, concurrency, 1),
+    1 + (erlang:crc32(DbName) rem N).
 
 
 -ifdef(TEST).


[couchdb] 02/03: supervise multiple couch_servers

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch couch_server_sharding
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 2896a7a4f9e08f604d04cb99e44100c60fc85c13
Author: Robert Newson <rn...@apache.org>
AuthorDate: Tue Feb 9 15:42:50 2021 +0000

    supervise multiple couch_servers
---
 src/couch/src/couch_primary_sup.erl | 35 +++++++++++++++++++++++++++--------
 src/couch/src/couch_server.erl      |  6 +++---
 2 files changed, 30 insertions(+), 11 deletions(-)

diff --git a/src/couch/src/couch_primary_sup.erl b/src/couch/src/couch_primary_sup.erl
index dc2d9e5..363e850 100644
--- a/src/couch/src/couch_primary_sup.erl
+++ b/src/couch/src/couch_primary_sup.erl
@@ -30,13 +30,32 @@ init([]) ->
             permanent,
             brutal_kill,
             worker,
-            [couch_task_status]},
-        {couch_server,
-            {couch_server, sup_start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_server]}
-    ],
+            [couch_task_status]}
+    ] ++ couch_servers(),
     {ok, {{one_for_one, 10, 3600}, Children}}.
 
+
+couch_servers() ->
+    case application:get_env(couch, concurrency, 10) of
+        undefined ->
+            [couch_server(undefined)];
+        N when is_integer(N), N > 1 ->
+            [couch_server(I) || I <- lists:seq(1, N)]
+    end.
+
+
+couch_server(Num) ->
+    Name = couch_server_name(Num),
+    {Name,
+     {couch_server, sup_start_link, [Name]},
+     permanent,
+     brutal_kill,
+     worker,
+     [couch_server]}.
+
+
+couch_server_name(undefined) ->
+    couch_server;
+
+couch_server_name(N) when is_integer(N), N > 0 ->
+    list_to_atom("couch_server_" ++ integer_to_list(N)).
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index f57bf3b..88052b1 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -17,7 +17,7 @@
 
 -export([open/2,create/2,delete/2,get_version/0,get_version/1,get_git_sha/0,get_uuid/0]).
 -export([all_databases/0, all_databases/2]).
--export([init/1, handle_call/3,sup_start_link/0]).
+-export([init/1, handle_call/3,sup_start_link/1]).
 -export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
 -export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]).
 -export([close_lru/0]).
@@ -76,8 +76,8 @@ get_stats() ->
             gen_server:call(couch_server, get_server),
     [{start_time, ?l2b(Time)}, {dbs_open, Open}].
 
-sup_start_link() ->
-    gen_server:start_link({local, couch_server}, couch_server, [], []).
+sup_start_link(Name) ->
+    gen_server:start_link({local, Name}, couch_server, [], []).
 
 open(DbName, Options) ->
     try