You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2021/02/10 18:01:31 UTC

[couchdb] 01/02: ick

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch couch_server_sharding
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b3265de5b6576ff84099e242e6979ef29e7be69d
Author: Robert Newson <rn...@apache.org>
AuthorDate: Tue Feb 9 17:13:55 2021 +0000

    ick
---
 src/couch/src/couch_lru.erl         |  13 +++--
 src/couch/src/couch_primary_sup.erl |  20 ++-----
 src/couch/src/couch_server.erl      | 111 ++++++++++++++++++++----------------
 3 files changed, 74 insertions(+), 70 deletions(-)

diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl
index 6ad7c65..786e80f 100644
--- a/src/couch/src/couch_lru.erl
+++ b/src/couch/src/couch_lru.erl
@@ -43,17 +43,20 @@ close({Tree, _} = Cache) ->
 close_int(none, _) ->
     false;
 close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) ->
-    case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
+    CouchDbs = couch_server:name("couch_server", couch_server:num_for_db(DbName)),
+    CouchDbsPidToName = couch_server:name("couch_dbs_pid_to_name", couch_server:num_for_db(DbName)),
+
+    case ets:update_element(CouchDbs, DbName, {#entry.lock, locked}) of
     true ->
-        [#entry{db = Db, pid = Pid}] = ets:lookup(couch_dbs, DbName),
+        [#entry{db = Db, pid = Pid}] = ets:lookup(CouchDbs, DbName),
         case couch_db:is_idle(Db) of true ->
-            true = ets:delete(couch_dbs, DbName),
-            true = ets:delete(couch_dbs_pid_to_name, Pid),
+            true = ets:delete(CouchDbs, DbName),
+            true = ets:delete(CouchDbsPidToName, Pid),
             exit(Pid, kill),
             {true, {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)}};
         false ->
             ElemSpec = {#entry.lock, unlocked},
-            true = ets:update_element(couch_dbs, DbName, ElemSpec),
+            true = ets:update_element(CouchDbs, DbName, ElemSpec),
             couch_stats:increment_counter([couchdb, couch_server, lru_skip]),
             close_int(gb_trees:next(Iter), update(DbName, Cache))
         end;
diff --git a/src/couch/src/couch_primary_sup.erl b/src/couch/src/couch_primary_sup.erl
index 363e850..19a8c25 100644
--- a/src/couch/src/couch_primary_sup.erl
+++ b/src/couch/src/couch_primary_sup.erl
@@ -36,26 +36,14 @@ init([]) ->
 
 
 couch_servers() ->
-    case application:get_env(couch, concurrency, 10) of
-        undefined ->
-            [couch_server(undefined)];
-        N when is_integer(N), N > 1 ->
-            [couch_server(I) || I <- lists:seq(1, N)]
-    end.
-
+    N = application:get_env(couch, concurrency, 1),
+    [couch_server(I) || I <- lists:seq(1, N)].
 
 couch_server(Num) ->
-    Name = couch_server_name(Num),
+    Name = couch_server:name("couch_server", Num),
     {Name,
-     {couch_server, sup_start_link, [Name]},
+     {couch_server, sup_start_link, [Num]},
      permanent,
      brutal_kill,
      worker,
      [couch_server]}.
-
-
-couch_server_name(undefined) ->
-    couch_server;
-
-couch_server_name(N) when is_integer(N), N > 0 ->
-    list_to_atom("couch_server_" ++ integer_to_list(N)).
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 88052b1..a706e08 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -28,6 +28,7 @@
 -export([get_engine_path/2]).
 -export([lock/2, unlock/1]).
 -export([db_updated/1]).
+-export([name/2, num_for_db/1]).
 
 % config_listener api
 -export([handle_config_change/5, handle_config_terminate/3]).
@@ -45,7 +46,8 @@
     dbs_open=0,
     start_time="",
     update_lru_on_read=true,
-    lru = couch_lru:new()
+    lru = couch_lru:new(),
+    num
     }).
 
 dev_start() ->
@@ -76,8 +78,8 @@ get_stats() ->
             gen_server:call(couch_server, get_server),
     [{start_time, ?l2b(Time)}, {dbs_open, Open}].
 
-sup_start_link(Name) ->
-    gen_server:start_link({local, Name}, couch_server, [], []).
+sup_start_link(Num) ->
+    gen_server:start_link({local, name("couch_server", Num)}, couch_server, [Num], []).
 
 open(DbName, Options) ->
     try
@@ -89,7 +91,7 @@ open(DbName, Options) ->
 
 open_int(DbName, Options0) ->
     Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", num_for_db(DbName)), DbName) of
     [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
         update_lru(DbName, Entry#entry.db_options),
         {ok, Db1} = couch_db:incref(Db0),
@@ -98,7 +100,7 @@ open_int(DbName, Options0) ->
         Options = maybe_add_sys_db_callbacks(DbName, Options0),
         Timeout = couch_util:get_value(timeout, Options, infinity),
         Create = couch_util:get_value(create_if_missing, Options, false),
-        case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of
+        case gen_server:call(name("couch_server", num_for_db(DbName)), {open, DbName, Options}, Timeout) of
         {ok, Db0} ->
             {ok, Db1} = couch_db:incref(Db0),
             couch_db:set_user_ctx(Db1, Ctx);
@@ -114,7 +116,7 @@ update_lru(DbName, Options) ->
     case config:get_boolean("couchdb", "update_lru_on_read", false) of
         true ->
             case lists:member(sys_db, Options) of
-                false -> gen_server:cast(couch_server, {update_lru, DbName});
+                false -> gen_server:cast(name("couch_server", num_for_db(DbName)), {update_lru, DbName});
                 true -> ok
             end;
         false ->
@@ -135,7 +137,7 @@ create(DbName, Options) ->
 create_int(DbName, Options0) ->
     Options = maybe_add_sys_db_callbacks(DbName, Options0),
     couch_partition:validate_dbname(DbName, Options),
-    case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
+    case gen_server:call(name("couch_server", num_for_db(DbName)), {create, DbName, Options}, infinity) of
     {ok, Db0} ->
         Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
         {ok, Db1} = couch_db:incref(Db0),
@@ -145,7 +147,7 @@ create_int(DbName, Options0) ->
     end.
 
 delete(DbName, Options) ->
-    gen_server:call(couch_server, {delete, DbName, Options}, infinity).
+    gen_server:call(name("couch_server", num_for_db(DbName)), {delete, DbName, Options}, infinity).
 
 
 exists(DbName) ->
@@ -228,13 +230,13 @@ hash_admin_passwords(Persist) ->
 close_db_if_idle(DbName) ->
     case ets:lookup(couch_dbs, DbName) of
         [#entry{}] ->
-            gen_server:cast(couch_server, {close_db_if_idle, DbName});
+            gen_server:cast(name("couch_server", num_for_db(DbName)), {close_db_if_idle, DbName});
         [] ->
             ok
     end.
 
 
-init([]) ->
+init([Num]) ->
     couch_util:set_mqd_off_heap(?MODULE),
     couch_util:set_process_priority(?MODULE, high),
 
@@ -271,15 +273,15 @@ init([]) ->
     ok = config:listen_for_changes(?MODULE, nil),
     ok = couch_file:init_delete_dir(RootDir),
     hash_admin_passwords(),
-    ets:new(couch_dbs, [
+    ets:new(name("couch_dbs", Num), [
         set,
         protected,
         named_table,
         {keypos, #entry.name},
         {read_concurrency, true}
     ]),
-    ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
-    ets:new(couch_dbs_locks, [
+    ets:new(name("couch_dbs_pid_to_name", Num), [set, protected, named_table]),
+    ets:new(name("couch_dbs_locks", Num), [
         set,
         public,
         named_table,
@@ -290,7 +292,8 @@ init([]) ->
                 engines = Engines,
                 max_dbs_open=MaxDbsOpen,
                 update_lru_on_read=UpdateLruOnRead,
-                start_time=couch_util:rfc1123_date()}}.
+                start_time=couch_util:rfc1123_date(),
+                num=Num}}.
 
 terminate(Reason, Srv) ->
     couch_log:error("couch_server terminating with ~p, state ~2048p",
@@ -302,7 +305,7 @@ terminate(Reason, Srv) ->
         if Db == undefined -> ok; true ->
             couch_util:shutdown_sync(couch_db:get_pid(Db))
         end
-    end, nil, couch_dbs),
+    end, nil, name("couch_dbs", Srv#server.num)),
     ok.
 
 handle_config_change("couchdb", "database_dir", _, _, _) ->
@@ -425,7 +428,7 @@ open_async(Server, From, DbName, Options) ->
         true -> create;
         false -> open
     end,
-    true = ets:insert(couch_dbs, #entry{
+    true = ets:insert(name("couch_dbs", Server#server.num), #entry{
         name = DbName,
         pid = Opener,
         lock = locked,
@@ -433,7 +436,7 @@ open_async(Server, From, DbName, Options) ->
         req_type = ReqType,
         db_options = Options
     }),
-    true = ets:insert(couch_dbs_pid_to_name, {Opener, DbName}),
+    true = ets:insert(name("couch_dbs_pid_to_name", Server#server.num), {Opener, DbName}),
     db_opened(Server, Options).
 
 open_async_int(Server, DbName, Options) ->
@@ -468,9 +471,9 @@ handle_call(reload_engines, _From, Server) ->
 handle_call(get_server, _From, Server) ->
     {reply, {ok, Server}, Server};
 handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
-    true = ets:delete(couch_dbs_pid_to_name, Opener),
+    true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Opener),
     DbPid = couch_db:get_pid(Db),
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
         [] ->
             % db was deleted during async open
             exit(DbPid, kill),
@@ -485,7 +488,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
                 _ ->
                     ok
             end,
-            true = ets:insert(couch_dbs, #entry{
+            true = ets:insert(name("couch_dbs", Server#server.num), #entry{
                 name = DbName,
                 db = Db,
                 pid = DbPid,
@@ -493,7 +496,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
                 db_options = Entry#entry.db_options,
                 start_time = couch_db:get_instance_start_time(Db)
             }),
-            true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}),
+            true = ets:insert(name("couch_dbs_pid_to_name", Server#server.num), {DbPid, DbName}),
             Lru = case couch_db:is_system_db(Db) of
                 false ->
                     couch_lru:insert(DbName, Server#server.lru);
@@ -511,14 +514,14 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
 handle_call({open_result, DbName, {error, eexist}}, From, Server) ->
     handle_call({open_result, DbName, file_exists}, From, Server);
 handle_call({open_result, DbName, Error}, {Opener, _}, Server) ->
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
         [] ->
             % db was deleted during async open
             {reply, ok, Server};
         [#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] ->
             [gen_server:reply(Waiter, Error) || Waiter <- Waiters],
-            true = ets:delete(couch_dbs, DbName),
-            true = ets:delete(couch_dbs_pid_to_name, Opener),
+            true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+            true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Opener),
             NewServer = case ReqType of
                 {create, DbName, Options, CrFrom} ->
                     open_async(Server, CrFrom, DbName, Options);
@@ -532,7 +535,7 @@ handle_call({open_result, DbName, Error}, {Opener, _}, Server) ->
             {reply, ok, Server}
     end;
 handle_call({open, DbName, Options}, From, Server) ->
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
     [] ->
         case make_room(Server, Options) of
         {ok, Server2} ->
@@ -541,7 +544,7 @@ handle_call({open, DbName, Options}, From, Server) ->
             {reply, CloseError, Server}
         end;
     [#entry{waiters = Waiters} = Entry] when is_list(Waiters) ->
-        true = ets:insert(couch_dbs, Entry#entry{waiters = [From | Waiters]}),
+        true = ets:insert(name("couch_dbs", Server#server.num), Entry#entry{waiters = [From | Waiters]}),
         NumWaiters = length(Waiters),
         if NumWaiters =< 10 orelse NumWaiters rem 10 /= 0 -> ok; true ->
             Fmt = "~b clients waiting to open db ~s",
@@ -552,7 +555,7 @@ handle_call({open, DbName, Options}, From, Server) ->
         {reply, {ok, Db}, Server}
     end;
 handle_call({create, DbName, Options}, From, Server) ->
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
     [] ->
         case make_room(Server, Options) of
         {ok, Server2} ->
@@ -567,7 +570,7 @@ handle_call({create, DbName, Options}, From, Server) ->
         % to wait while we figure out if it'll succeed.
         CrOptions = [create | Options],
         Req = {create, DbName, CrOptions, From},
-        true = ets:insert(couch_dbs, Entry#entry{req_type = Req}),
+        true = ets:insert(name("couch_dbs", Server#server.num), Entry#entry{req_type = Req}),
         {noreply, Server};
     [_AlreadyRunningDb] ->
         {reply, file_exists, Server}
@@ -577,17 +580,17 @@ handle_call({delete, DbName, Options}, _From, Server) ->
     case check_dbname(DbNameList) of
     ok ->
         Server2 =
-        case ets:lookup(couch_dbs, DbName) of
+        case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
         [] -> Server;
         [#entry{pid = Pid, waiters = Waiters} = Entry] when is_list(Waiters) ->
-            true = ets:delete(couch_dbs, DbName),
-            true = ets:delete(couch_dbs_pid_to_name, Pid),
+            true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+            true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
             exit(Pid, kill),
             [gen_server:reply(Waiter, not_found) || Waiter <- Waiters],
             db_closed(Server, Entry#entry.db_options);
         [#entry{pid = Pid} = Entry] ->
-            true = ets:delete(couch_dbs, DbName),
-            true = ets:delete(couch_dbs_pid_to_name, Pid),
+            true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+            true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
             exit(Pid, kill),
             db_closed(Server, Entry#entry.db_options)
         end,
@@ -616,9 +619,9 @@ handle_call({delete, DbName, Options}, _From, Server) ->
 handle_call({db_updated, Db}, _From, Server0) ->
     DbName = couch_db:name(Db),
     StartTime = couch_db:get_instance_start_time(Db),
-    Server = try ets:lookup_element(couch_dbs, DbName, #entry.start_time) of
+    Server = try ets:lookup_element(name("couch_dbs", Server0#server.num), DbName, #entry.start_time) of
         StartTime ->
-            true = ets:update_element(couch_dbs, DbName, {#entry.db, Db}),
+            true = ets:update_element(name("couch_dbs", Server0#server.num), DbName, {#entry.db, Db}),
             Lru = case couch_db:is_system_db(Db) of
                 false -> couch_lru:update(DbName, Server0#server.lru);
                 true -> Server0#server.lru
@@ -636,19 +639,19 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} =
 handle_cast({update_lru, _DbName}, Server) ->
     {noreply, Server};
 handle_cast({close_db_if_idle, DbName}, Server) ->
-    case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
+    case ets:update_element(name("couch_dbs", Server#server.num), DbName, {#entry.lock, locked}) of
     true ->
-        [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs, DbName),
+        [#entry{db = Db, db_options = DbOpts}] = ets:lookup(name("couch_dbs", Server#server.num), DbName),
         case couch_db:is_idle(Db) of
         true ->
             DbPid = couch_db:get_pid(Db),
-            true = ets:delete(couch_dbs, DbName),
-            true = ets:delete(couch_dbs_pid_to_name, DbPid),
+            true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+            true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), DbPid),
             exit(DbPid, kill),
             {noreply, db_closed(Server, DbOpts)};
         false ->
             true = ets:update_element(
-                     couch_dbs, DbName, {#entry.lock, unlocked}),
+                name("couch_dbs", Server#server.num), DbName, {#entry.lock, unlocked}),
             {noreply, Server}
         end;
     false ->
@@ -664,9 +667,9 @@ code_change(_OldVsn, #server{}=State, _Extra) ->
 handle_info({'EXIT', _Pid, config_change}, Server) ->
     {stop, config_change, Server};
 handle_info({'EXIT', Pid, Reason}, Server) ->
-    case ets:lookup(couch_dbs_pid_to_name, Pid) of
+    case ets:lookup(name("couch_dbs_pid_to_name", Server#server.num), Pid) of
     [{Pid, DbName}] ->
-        [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs, DbName),
+        [#entry{waiters = Waiters} = Entry] = ets:lookup(name("couch_dbs", Server#server.num), DbName),
         if Reason /= snappy_nif_not_loaded -> ok; true ->
             Msg = io_lib:format("To open the database `~s`, Apache CouchDB "
                 "must be built with Erlang OTP R13B04 or higher.", [DbName]),
@@ -681,8 +684,8 @@ handle_info({'EXIT', Pid, Reason}, Server) ->
         if not is_list(Waiters) -> ok; true ->
             [gen_server:reply(Waiter, Reason) || Waiter <- Waiters]
         end,
-        true = ets:delete(couch_dbs, DbName),
-        true = ets:delete(couch_dbs_pid_to_name, Pid),
+        true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+        true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
         {noreply, db_closed(Server, Entry#entry.db_options)};
     [] ->
         {noreply, Server}
@@ -720,7 +723,7 @@ validate_open_or_create(DbName, Options) ->
             throw({?MODULE, EngineError})
     end,
 
-    case ets:lookup(couch_dbs_locks, DbName) of
+    case ets:lookup(name("couch_dbs_locks", num_for_db(DbName)), DbName) of
         [] ->
             ok;
         [{DbName, Reason}] ->
@@ -861,21 +864,31 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) ->
     end.
 
 lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) ->
-    case ets:lookup(couch_dbs, DbName) of
+    case ets:lookup(name("couch_dbs", num_for_db(DbName)), DbName) of
         [] ->
-            true = ets:insert(couch_dbs_locks, {DbName, Reason}),
+            true = ets:insert(name("couch_dbs_locks", num_for_db(DbName)), {DbName, Reason}),
             ok;
         [#entry{}] ->
             {error, already_opened}
     end.
 
 unlock(DbName) when is_binary(DbName) ->
-    true = ets:delete(couch_dbs_locks, DbName),
+    true = ets:delete(name("couch_dbs_locks", num_for_db(DbName)), DbName),
     ok.
 
 
 db_updated(Db) ->
-    gen_server:call(couch_server, {db_updated, Db}, infinity).
+    DbName = couch_db:name(Db),
+    gen_server:call(name("couch_server", num_for_db(DbName)), {db_updated, Db}, infinity).
+
+
+name(Name, N) when is_integer(N), N >= 0 ->
+    list_to_atom(Name ++ "_" ++ integer_to_list(N)).
+
+
+num_for_db(DbName) when is_binary(DbName) ->
+    N = application:get_env(couch, concurrency, 1),
+    1 + (erlang:crc32(DbName) rem N).
 
 
 -ifdef(TEST).