You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2021/02/10 18:01:30 UTC
[couchdb] branch couch_server_sharding updated (218b01a -> 54b1fb4)
This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a change to branch couch_server_sharding
in repository https://gitbox.apache.org/repos/asf/couchdb.git.
discard 218b01a ick
new b3265de ick
new 54b1fb4 de-ick
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (218b01a)
\
N -- N -- N refs/heads/couch_server_sharding (54b1fb4)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
src/couch/src/couch_lru.erl | 4 +-
src/couch/src/couch_primary_sup.erl | 24 +++---
src/couch/src/couch_server.erl | 163 ++++++++++++++++++++----------------
3 files changed, 103 insertions(+), 88 deletions(-)
[couchdb] 02/02: de-ick
Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch couch_server_sharding
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 54b1fb48d05671e51c0d829383a8d79876d14a6b
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Feb 10 17:36:31 2021 +0000
de-ick
---
src/couch/src/couch_lru.erl | 4 +-
src/couch/src/couch_primary_sup.erl | 17 ++--
src/couch/src/couch_server.erl | 167 ++++++++++++++++++++----------------
3 files changed, 106 insertions(+), 82 deletions(-)
diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl
index 786e80f..221dd85 100644
--- a/src/couch/src/couch_lru.erl
+++ b/src/couch/src/couch_lru.erl
@@ -43,8 +43,8 @@ close({Tree, _} = Cache) ->
close_int(none, _) ->
false;
close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) ->
- CouchDbs = couch_server:name("couch_server", couch_server:num_for_db(DbName)),
- CouchDbsPidToName = couch_server:name("couch_dbs_pid_to_name", couch_server:num_for_db(DbName)),
+ CouchDbs = couch_server:couch_server(DbName),
+ CouchDbsPidToName = couch_server:couch_dbs_pid_to_name(DbName),
case ets:update_element(CouchDbs, DbName, {#entry.lock, locked}) of
true ->
diff --git a/src/couch/src/couch_primary_sup.erl b/src/couch/src/couch_primary_sup.erl
index 19a8c25..48c6d42 100644
--- a/src/couch/src/couch_primary_sup.erl
+++ b/src/couch/src/couch_primary_sup.erl
@@ -36,14 +36,15 @@ init([]) ->
couch_servers() ->
- N = application:get_env(couch, concurrency, 1),
+ N = couch_server:num_servers(),
[couch_server(I) || I <- lists:seq(1, N)].
-couch_server(Num) ->
- Name = couch_server:name("couch_server", Num),
+couch_server(N) ->
+ Name = couch_server:couch_server(N),
{Name,
- {couch_server, sup_start_link, [Num]},
- permanent,
- brutal_kill,
- worker,
- [couch_server]}.
+ {couch_server, sup_start_link, [N]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_server]
+ }.
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index a706e08..ec43b45 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -20,7 +20,6 @@
-export([init/1, handle_call/3,sup_start_link/1]).
-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
-export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]).
--export([close_lru/0]).
-export([close_db_if_idle/1]).
-export([delete_compaction_files/1]).
-export([exists/1]).
@@ -28,7 +27,7 @@
-export([get_engine_path/2]).
-export([lock/2, unlock/1]).
-export([db_updated/1]).
--export([name/2, num_for_db/1]).
+-export([num_servers/0, couch_server/1, couch_dbs_pid_to_name/1]).
% config_listener api
-export([handle_config_change/5, handle_config_terminate/3]).
@@ -47,7 +46,7 @@
start_time="",
update_lru_on_read=true,
lru = couch_lru:new(),
- num
+ n
}).
dev_start() ->
@@ -74,12 +73,16 @@ get_uuid() ->
end.
get_stats() ->
- {ok, #server{start_time=Time,dbs_open=Open}} =
- gen_server:call(couch_server, get_server),
+ Fun = fun(N, {TimeAcc, OpenAcc}) ->
+ {ok, #server{start_time=Time,dbs_open=Open}} =
+ gen_server:call(couch_server(N), get_server),
+ {max(Time, TimeAcc), Open + OpenAcc} end,
+ {Time, Open} =
+ lists:foldl(Fun, {0, 0}, lists:seq(1, num_couch_servers())),
[{start_time, ?l2b(Time)}, {dbs_open, Open}].
-sup_start_link(Num) ->
- gen_server:start_link({local, name("couch_server", Num)}, couch_server, [Num], []).
+sup_start_link(N) ->
+ gen_server:start_link({local, couch_server(N)}, couch_server, [N], []).
open(DbName, Options) ->
try
@@ -91,7 +94,7 @@ open(DbName, Options) ->
open_int(DbName, Options0) ->
Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
- case ets:lookup(name("couch_dbs", num_for_db(DbName)), DbName) of
+ case ets:lookup(couch_dbs(DbName), DbName) of
[#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
update_lru(DbName, Entry#entry.db_options),
{ok, Db1} = couch_db:incref(Db0),
@@ -100,7 +103,7 @@ open_int(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
Timeout = couch_util:get_value(timeout, Options, infinity),
Create = couch_util:get_value(create_if_missing, Options, false),
- case gen_server:call(name("couch_server", num_for_db(DbName)), {open, DbName, Options}, Timeout) of
+ case gen_server:call(couch_server(DbName), {open, DbName, Options}, Timeout) of
{ok, Db0} ->
{ok, Db1} = couch_db:incref(Db0),
couch_db:set_user_ctx(Db1, Ctx);
@@ -116,15 +119,13 @@ update_lru(DbName, Options) ->
case config:get_boolean("couchdb", "update_lru_on_read", false) of
true ->
case lists:member(sys_db, Options) of
- false -> gen_server:cast(name("couch_server", num_for_db(DbName)), {update_lru, DbName});
+ false -> gen_server:cast(couch_server(DbName), {update_lru, DbName});
true -> ok
end;
false ->
ok
end.
-close_lru() ->
- gen_server:call(couch_server, close_lru).
create(DbName, Options) ->
try
@@ -137,7 +138,7 @@ create(DbName, Options) ->
create_int(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
couch_partition:validate_dbname(DbName, Options),
- case gen_server:call(name("couch_server", num_for_db(DbName)), {create, DbName, Options}, infinity) of
+ case gen_server:call(couch_server(DbName), {create, DbName, Options}, infinity) of
{ok, Db0} ->
Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
{ok, Db1} = couch_db:incref(Db0),
@@ -147,7 +148,7 @@ create_int(DbName, Options0) ->
end.
delete(DbName, Options) ->
- gen_server:call(name("couch_server", num_for_db(DbName)), {delete, DbName, Options}, infinity).
+ gen_server:call(couch_server(DbName), {delete, DbName, Options}, infinity).
exists(DbName) ->
@@ -230,13 +231,13 @@ hash_admin_passwords(Persist) ->
close_db_if_idle(DbName) ->
case ets:lookup(couch_dbs, DbName) of
[#entry{}] ->
- gen_server:cast(name("couch_server", num_for_db(DbName)), {close_db_if_idle, DbName});
+ gen_server:cast(couch_server(DbName), {close_db_if_idle, DbName});
[] ->
ok
end.
-init([Num]) ->
+init([N]) ->
couch_util:set_mqd_off_heap(?MODULE),
couch_util:set_process_priority(?MODULE, high),
@@ -270,18 +271,18 @@ init([Num]) ->
config:get("couchdb", "max_dbs_open", integer_to_list(?MAX_DBS_OPEN))),
UpdateLruOnRead =
config:get("couchdb", "update_lru_on_read", "false") =:= "true",
- ok = config:listen_for_changes(?MODULE, nil),
+ ok = config:listen_for_changes(?MODULE, N),
ok = couch_file:init_delete_dir(RootDir),
hash_admin_passwords(),
- ets:new(name("couch_dbs", Num), [
+ ets:new(couch_dbs(N), [
set,
protected,
named_table,
{keypos, #entry.name},
{read_concurrency, true}
]),
- ets:new(name("couch_dbs_pid_to_name", Num), [set, protected, named_table]),
- ets:new(name("couch_dbs_locks", Num), [
+ ets:new(couch_dbs_pid_to_name(N), [set, protected, named_table]),
+ ets:new(couch_dbs_locks(N), [
set,
public,
named_table,
@@ -293,7 +294,7 @@ init([Num]) ->
max_dbs_open=MaxDbsOpen,
update_lru_on_read=UpdateLruOnRead,
start_time=couch_util:rfc1123_date(),
- num=Num}}.
+ n=N}}.
terminate(Reason, Srv) ->
couch_log:error("couch_server terminating with ~p, state ~2048p",
@@ -305,22 +306,22 @@ terminate(Reason, Srv) ->
if Db == undefined -> ok; true ->
couch_util:shutdown_sync(couch_db:get_pid(Db))
end
- end, nil, name("couch_dbs", Srv#server.num)),
+ end, nil, couch_dbs(Srv)),
ok.
handle_config_change("couchdb", "database_dir", _, _, _) ->
exit(whereis(couch_server), config_change),
remove_handler;
-handle_config_change("couchdb", "update_lru_on_read", "true", _, _) ->
- {ok, gen_server:call(couch_server,{set_update_lru_on_read,true})};
-handle_config_change("couchdb", "update_lru_on_read", _, _, _) ->
- {ok, gen_server:call(couch_server,{set_update_lru_on_read,false})};
-handle_config_change("couchdb", "max_dbs_open", Max, _, _) when is_list(Max) ->
- {ok, gen_server:call(couch_server,{set_max_dbs_open,list_to_integer(Max)})};
-handle_config_change("couchdb", "max_dbs_open", _, _, _) ->
- {ok, gen_server:call(couch_server,{set_max_dbs_open,?MAX_DBS_OPEN})};
-handle_config_change("couchdb_engines", _, _, _, _) ->
- {ok, gen_server:call(couch_server, reload_engines)};
+handle_config_change("couchdb", "update_lru_on_read", "true", _, N) ->
+ {ok, gen_server:call(couch_server(N),{set_update_lru_on_read,true})};
+handle_config_change("couchdb", "update_lru_on_read", _, _, N) ->
+ {ok, gen_server:call(couch_server(N),{set_update_lru_on_read,false})};
+handle_config_change("couchdb", "max_dbs_open", Max, _, N) when is_list(Max) ->
+ {ok, gen_server:call(couch_server(N),{set_max_dbs_open,list_to_integer(Max)})};
+handle_config_change("couchdb", "max_dbs_open", _, _, N) ->
+ {ok, gen_server:call(couch_server(N),{set_max_dbs_open,?MAX_DBS_OPEN})};
+handle_config_change("couchdb_engines", _, _, _, N) ->
+ {ok, gen_server:call(couch_server(N), reload_engines)};
handle_config_change("admins", _, _, Persist, _) ->
% spawn here so couch event manager doesn't deadlock
{ok, spawn(fun() -> hash_admin_passwords(Persist) end)};
@@ -347,7 +348,7 @@ all_databases() ->
{ok, lists:usort(DbList)}.
all_databases(Fun, Acc0) ->
- {ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server),
+ {ok, #server{root_dir=Root}} = gen_server:call(couch_server_1, get_server),
NormRoot = couch_util:normpath(Root),
Extensions = get_engine_extensions(),
ExtRegExp = "(" ++ string:join(Extensions, "|") ++ ")",
@@ -428,7 +429,7 @@ open_async(Server, From, DbName, Options) ->
true -> create;
false -> open
end,
- true = ets:insert(name("couch_dbs", Server#server.num), #entry{
+ true = ets:insert(couch_dbs(Server), #entry{
name = DbName,
pid = Opener,
lock = locked,
@@ -436,7 +437,7 @@ open_async(Server, From, DbName, Options) ->
req_type = ReqType,
db_options = Options
}),
- true = ets:insert(name("couch_dbs_pid_to_name", Server#server.num), {Opener, DbName}),
+ true = ets:insert(couch_dbs_pid_to_name(Server), {Opener, DbName}),
db_opened(Server, Options).
open_async_int(Server, DbName, Options) ->
@@ -471,9 +472,9 @@ handle_call(reload_engines, _From, Server) ->
handle_call(get_server, _From, Server) ->
{reply, {ok, Server}, Server};
handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
- true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Opener),
+ true = ets:delete(couch_dbs_pid_to_name(Server), Opener),
DbPid = couch_db:get_pid(Db),
- case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
+ case ets:lookup(couch_dbs(Server), DbName) of
[] ->
% db was deleted during async open
exit(DbPid, kill),
@@ -488,7 +489,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
_ ->
ok
end,
- true = ets:insert(name("couch_dbs", Server#server.num), #entry{
+ true = ets:insert(couch_dbs(Server), #entry{
name = DbName,
db = Db,
pid = DbPid,
@@ -496,7 +497,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
db_options = Entry#entry.db_options,
start_time = couch_db:get_instance_start_time(Db)
}),
- true = ets:insert(name("couch_dbs_pid_to_name", Server#server.num), {DbPid, DbName}),
+ true = ets:insert(couch_dbs_pid_to_name(Server), {DbPid, DbName}),
Lru = case couch_db:is_system_db(Db) of
false ->
couch_lru:insert(DbName, Server#server.lru);
@@ -514,14 +515,14 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
handle_call({open_result, DbName, {error, eexist}}, From, Server) ->
handle_call({open_result, DbName, file_exists}, From, Server);
handle_call({open_result, DbName, Error}, {Opener, _}, Server) ->
- case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
+ case ets:lookup(couch_dbs(Server), DbName) of
[] ->
% db was deleted during async open
{reply, ok, Server};
[#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] ->
[gen_server:reply(Waiter, Error) || Waiter <- Waiters],
- true = ets:delete(name("couch_dbs", Server#server.num), DbName),
- true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Opener),
+ true = ets:delete(couch_dbs(Server), DbName),
+ true = ets:delete(couch_dbs_pid_to_name(Server), Opener),
NewServer = case ReqType of
{create, DbName, Options, CrFrom} ->
open_async(Server, CrFrom, DbName, Options);
@@ -535,7 +536,7 @@ handle_call({open_result, DbName, Error}, {Opener, _}, Server) ->
{reply, ok, Server}
end;
handle_call({open, DbName, Options}, From, Server) ->
- case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
+ case ets:lookup(couch_dbs(Server), DbName) of
[] ->
case make_room(Server, Options) of
{ok, Server2} ->
@@ -544,7 +545,7 @@ handle_call({open, DbName, Options}, From, Server) ->
{reply, CloseError, Server}
end;
[#entry{waiters = Waiters} = Entry] when is_list(Waiters) ->
- true = ets:insert(name("couch_dbs", Server#server.num), Entry#entry{waiters = [From | Waiters]}),
+ true = ets:insert(couch_dbs(Server), Entry#entry{waiters = [From | Waiters]}),
NumWaiters = length(Waiters),
if NumWaiters =< 10 orelse NumWaiters rem 10 /= 0 -> ok; true ->
Fmt = "~b clients waiting to open db ~s",
@@ -555,7 +556,7 @@ handle_call({open, DbName, Options}, From, Server) ->
{reply, {ok, Db}, Server}
end;
handle_call({create, DbName, Options}, From, Server) ->
- case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
+ case ets:lookup(couch_dbs(Server), DbName) of
[] ->
case make_room(Server, Options) of
{ok, Server2} ->
@@ -570,7 +571,7 @@ handle_call({create, DbName, Options}, From, Server) ->
% to wait while we figure out if it'll succeed.
CrOptions = [create | Options],
Req = {create, DbName, CrOptions, From},
- true = ets:insert(name("couch_dbs", Server#server.num), Entry#entry{req_type = Req}),
+ true = ets:insert(couch_dbs(Server), Entry#entry{req_type = Req}),
{noreply, Server};
[_AlreadyRunningDb] ->
{reply, file_exists, Server}
@@ -580,17 +581,17 @@ handle_call({delete, DbName, Options}, _From, Server) ->
case check_dbname(DbNameList) of
ok ->
Server2 =
- case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
+ case ets:lookup(couch_dbs(Server), DbName) of
[] -> Server;
[#entry{pid = Pid, waiters = Waiters} = Entry] when is_list(Waiters) ->
- true = ets:delete(name("couch_dbs", Server#server.num), DbName),
- true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
+ true = ets:delete(couch_dbs(Server), DbName),
+ true = ets:delete(couch_dbs_pid_to_name(Server), Pid),
exit(Pid, kill),
[gen_server:reply(Waiter, not_found) || Waiter <- Waiters],
db_closed(Server, Entry#entry.db_options);
[#entry{pid = Pid} = Entry] ->
- true = ets:delete(name("couch_dbs", Server#server.num), DbName),
- true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
+ true = ets:delete(couch_dbs(Server), DbName),
+ true = ets:delete(couch_dbs_pid_to_name(Server), Pid),
exit(Pid, kill),
db_closed(Server, Entry#entry.db_options)
end,
@@ -619,9 +620,9 @@ handle_call({delete, DbName, Options}, _From, Server) ->
handle_call({db_updated, Db}, _From, Server0) ->
DbName = couch_db:name(Db),
StartTime = couch_db:get_instance_start_time(Db),
- Server = try ets:lookup_element(name("couch_dbs", Server0#server.num), DbName, #entry.start_time) of
+ Server = try ets:lookup_element(couch_dbs(Server0), DbName, #entry.start_time) of
StartTime ->
- true = ets:update_element(name("couch_dbs", Server0#server.num), DbName, {#entry.db, Db}),
+ true = ets:update_element(couch_dbs(Server0), DbName, {#entry.db, Db}),
Lru = case couch_db:is_system_db(Db) of
false -> couch_lru:update(DbName, Server0#server.lru);
true -> Server0#server.lru
@@ -639,19 +640,19 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} =
handle_cast({update_lru, _DbName}, Server) ->
{noreply, Server};
handle_cast({close_db_if_idle, DbName}, Server) ->
- case ets:update_element(name("couch_dbs", Server#server.num), DbName, {#entry.lock, locked}) of
+ case ets:update_element(couch_dbs(Server), DbName, {#entry.lock, locked}) of
true ->
- [#entry{db = Db, db_options = DbOpts}] = ets:lookup(name("couch_dbs", Server#server.num), DbName),
+ [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs(Server), DbName),
case couch_db:is_idle(Db) of
true ->
DbPid = couch_db:get_pid(Db),
- true = ets:delete(name("couch_dbs", Server#server.num), DbName),
- true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), DbPid),
+ true = ets:delete(couch_dbs(Server), DbName),
+ true = ets:delete(couch_dbs_pid_to_name(Server), DbPid),
exit(DbPid, kill),
{noreply, db_closed(Server, DbOpts)};
false ->
true = ets:update_element(
- name("couch_dbs", Server#server.num), DbName, {#entry.lock, unlocked}),
+ couch_dbs(Server), DbName, {#entry.lock, unlocked}),
{noreply, Server}
end;
false ->
@@ -667,9 +668,9 @@ code_change(_OldVsn, #server{}=State, _Extra) ->
handle_info({'EXIT', _Pid, config_change}, Server) ->
{stop, config_change, Server};
handle_info({'EXIT', Pid, Reason}, Server) ->
- case ets:lookup(name("couch_dbs_pid_to_name", Server#server.num), Pid) of
+ case ets:lookup(couch_dbs_pid_to_name(Server), Pid) of
[{Pid, DbName}] ->
- [#entry{waiters = Waiters} = Entry] = ets:lookup(name("couch_dbs", Server#server.num), DbName),
+ [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs(Server), DbName),
if Reason /= snappy_nif_not_loaded -> ok; true ->
Msg = io_lib:format("To open the database `~s`, Apache CouchDB "
"must be built with Erlang OTP R13B04 or higher.", [DbName]),
@@ -684,8 +685,8 @@ handle_info({'EXIT', Pid, Reason}, Server) ->
if not is_list(Waiters) -> ok; true ->
[gen_server:reply(Waiter, Reason) || Waiter <- Waiters]
end,
- true = ets:delete(name("couch_dbs", Server#server.num), DbName),
- true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
+ true = ets:delete(couch_dbs(Server), DbName),
+ true = ets:delete(couch_dbs_pid_to_name(Server), Pid),
{noreply, db_closed(Server, Entry#entry.db_options)};
[] ->
{noreply, Server}
@@ -723,7 +724,7 @@ validate_open_or_create(DbName, Options) ->
throw({?MODULE, EngineError})
end,
- case ets:lookup(name("couch_dbs_locks", num_for_db(DbName)), DbName) of
+ case ets:lookup(couch_dbs_locks(DbName), DbName) of
[] ->
ok;
[{DbName, Reason}] ->
@@ -864,31 +865,53 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) ->
end.
lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) ->
- case ets:lookup(name("couch_dbs", num_for_db(DbName)), DbName) of
+ case ets:lookup(couch_dbs(DbName), DbName) of
[] ->
- true = ets:insert(name("couch_dbs_locks", num_for_db(DbName)), {DbName, Reason}),
+ true = ets:insert(couch_dbs_locks(DbName), {DbName, Reason}),
ok;
[#entry{}] ->
{error, already_opened}
end.
unlock(DbName) when is_binary(DbName) ->
- true = ets:delete(name("couch_dbs_locks", num_for_db(DbName)), DbName),
+ true = ets:delete(couch_dbs_locks(DbName), DbName),
ok.
db_updated(Db) ->
DbName = couch_db:name(Db),
- gen_server:call(name("couch_server", num_for_db(DbName)), {db_updated, Db}, infinity).
+ gen_server:call(couch_server(DbName), {db_updated, Db}, infinity).
+
+
+couch_server(Arg) ->
+ name("couch_server", Arg).
+
+
+couch_dbs(Arg) ->
+ name("couch_dbs", Arg).
+
+
+couch_dbs_pid_to_name(Arg) ->
+ name("couch_dbs_pid_to_name", Arg).
+
+
+couch_dbs_locks(Arg) ->
+ name("couch_dbs_locks", Arg).
+
+
+name(BaseName, DbName) when is_binary(DbName) ->
+ N = 1 + (erlang:crc32(DbName) rem num_servers()),
+ name(BaseName, N);
+name(BaseName, #server{} = Srv) ->
+ name(BaseName, Srv#server.n);
-name(Name, N) when is_integer(N), N >= 0 ->
- list_to_atom(Name ++ "_" ++ integer_to_list(N)).
+name(BaseName, N) when is_integer(N), N > 0 ->
+ list_to_atom(BaseName ++ "_" ++ integer_to_list(N)).
-num_for_db(DbName) when is_binary(DbName) ->
- N = application:get_env(couch, concurrency, 1),
- 1 + (erlang:crc32(DbName) rem N).
+num_servers() ->
+ erlang:system_info(schedulers_online).
-ifdef(TEST).
[couchdb] 01/02: ick
Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch couch_server_sharding
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit b3265de5b6576ff84099e242e6979ef29e7be69d
Author: Robert Newson <rn...@apache.org>
AuthorDate: Tue Feb 9 17:13:55 2021 +0000
ick
---
src/couch/src/couch_lru.erl | 13 +++--
src/couch/src/couch_primary_sup.erl | 20 ++-----
src/couch/src/couch_server.erl | 111 ++++++++++++++++++++----------------
3 files changed, 74 insertions(+), 70 deletions(-)
diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl
index 6ad7c65..786e80f 100644
--- a/src/couch/src/couch_lru.erl
+++ b/src/couch/src/couch_lru.erl
@@ -43,17 +43,20 @@ close({Tree, _} = Cache) ->
close_int(none, _) ->
false;
close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) ->
- case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
+ CouchDbs = couch_server:name("couch_server", couch_server:num_for_db(DbName)),
+ CouchDbsPidToName = couch_server:name("couch_dbs_pid_to_name", couch_server:num_for_db(DbName)),
+
+ case ets:update_element(CouchDbs, DbName, {#entry.lock, locked}) of
true ->
- [#entry{db = Db, pid = Pid}] = ets:lookup(couch_dbs, DbName),
+ [#entry{db = Db, pid = Pid}] = ets:lookup(CouchDbs, DbName),
case couch_db:is_idle(Db) of true ->
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, Pid),
+ true = ets:delete(CouchDbs, DbName),
+ true = ets:delete(CouchDbsPidToName, Pid),
exit(Pid, kill),
{true, {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)}};
false ->
ElemSpec = {#entry.lock, unlocked},
- true = ets:update_element(couch_dbs, DbName, ElemSpec),
+ true = ets:update_element(CouchDbs, DbName, ElemSpec),
couch_stats:increment_counter([couchdb, couch_server, lru_skip]),
close_int(gb_trees:next(Iter), update(DbName, Cache))
end;
diff --git a/src/couch/src/couch_primary_sup.erl b/src/couch/src/couch_primary_sup.erl
index 363e850..19a8c25 100644
--- a/src/couch/src/couch_primary_sup.erl
+++ b/src/couch/src/couch_primary_sup.erl
@@ -36,26 +36,14 @@ init([]) ->
couch_servers() ->
- case application:get_env(couch, concurrency, 10) of
- undefined ->
- [couch_server(undefined)];
- N when is_integer(N), N > 1 ->
- [couch_server(I) || I <- lists:seq(1, N)]
- end.
-
+ N = application:get_env(couch, concurrency, 1),
+ [couch_server(I) || I <- lists:seq(1, N)].
couch_server(Num) ->
- Name = couch_server_name(Num),
+ Name = couch_server:name("couch_server", Num),
{Name,
- {couch_server, sup_start_link, [Name]},
+ {couch_server, sup_start_link, [Num]},
permanent,
brutal_kill,
worker,
[couch_server]}.
-
-
-couch_server_name(undefined) ->
- couch_server;
-
-couch_server_name(N) when is_integer(N), N > 0 ->
- list_to_atom("couch_server_" ++ integer_to_list(N)).
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 88052b1..a706e08 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -28,6 +28,7 @@
-export([get_engine_path/2]).
-export([lock/2, unlock/1]).
-export([db_updated/1]).
+-export([name/2, num_for_db/1]).
% config_listener api
-export([handle_config_change/5, handle_config_terminate/3]).
@@ -45,7 +46,8 @@
dbs_open=0,
start_time="",
update_lru_on_read=true,
- lru = couch_lru:new()
+ lru = couch_lru:new(),
+ num
}).
dev_start() ->
@@ -76,8 +78,8 @@ get_stats() ->
gen_server:call(couch_server, get_server),
[{start_time, ?l2b(Time)}, {dbs_open, Open}].
-sup_start_link(Name) ->
- gen_server:start_link({local, Name}, couch_server, [], []).
+sup_start_link(Num) ->
+ gen_server:start_link({local, name("couch_server", Num)}, couch_server, [Num], []).
open(DbName, Options) ->
try
@@ -89,7 +91,7 @@ open(DbName, Options) ->
open_int(DbName, Options0) ->
Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(name("couch_dbs", num_for_db(DbName)), DbName) of
[#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
update_lru(DbName, Entry#entry.db_options),
{ok, Db1} = couch_db:incref(Db0),
@@ -98,7 +100,7 @@ open_int(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
Timeout = couch_util:get_value(timeout, Options, infinity),
Create = couch_util:get_value(create_if_missing, Options, false),
- case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of
+ case gen_server:call(name("couch_server", num_for_db(DbName)), {open, DbName, Options}, Timeout) of
{ok, Db0} ->
{ok, Db1} = couch_db:incref(Db0),
couch_db:set_user_ctx(Db1, Ctx);
@@ -114,7 +116,7 @@ update_lru(DbName, Options) ->
case config:get_boolean("couchdb", "update_lru_on_read", false) of
true ->
case lists:member(sys_db, Options) of
- false -> gen_server:cast(couch_server, {update_lru, DbName});
+ false -> gen_server:cast(name("couch_server", num_for_db(DbName)), {update_lru, DbName});
true -> ok
end;
false ->
@@ -135,7 +137,7 @@ create(DbName, Options) ->
create_int(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
couch_partition:validate_dbname(DbName, Options),
- case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
+ case gen_server:call(name("couch_server", num_for_db(DbName)), {create, DbName, Options}, infinity) of
{ok, Db0} ->
Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
{ok, Db1} = couch_db:incref(Db0),
@@ -145,7 +147,7 @@ create_int(DbName, Options0) ->
end.
delete(DbName, Options) ->
- gen_server:call(couch_server, {delete, DbName, Options}, infinity).
+ gen_server:call(name("couch_server", num_for_db(DbName)), {delete, DbName, Options}, infinity).
exists(DbName) ->
@@ -228,13 +230,13 @@ hash_admin_passwords(Persist) ->
close_db_if_idle(DbName) ->
case ets:lookup(couch_dbs, DbName) of
[#entry{}] ->
- gen_server:cast(couch_server, {close_db_if_idle, DbName});
+ gen_server:cast(name("couch_server", num_for_db(DbName)), {close_db_if_idle, DbName});
[] ->
ok
end.
-init([]) ->
+init([Num]) ->
couch_util:set_mqd_off_heap(?MODULE),
couch_util:set_process_priority(?MODULE, high),
@@ -271,15 +273,15 @@ init([]) ->
ok = config:listen_for_changes(?MODULE, nil),
ok = couch_file:init_delete_dir(RootDir),
hash_admin_passwords(),
- ets:new(couch_dbs, [
+ ets:new(name("couch_dbs", Num), [
set,
protected,
named_table,
{keypos, #entry.name},
{read_concurrency, true}
]),
- ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
- ets:new(couch_dbs_locks, [
+ ets:new(name("couch_dbs_pid_to_name", Num), [set, protected, named_table]),
+ ets:new(name("couch_dbs_locks", Num), [
set,
public,
named_table,
@@ -290,7 +292,8 @@ init([]) ->
engines = Engines,
max_dbs_open=MaxDbsOpen,
update_lru_on_read=UpdateLruOnRead,
- start_time=couch_util:rfc1123_date()}}.
+ start_time=couch_util:rfc1123_date(),
+ num=Num}}.
terminate(Reason, Srv) ->
couch_log:error("couch_server terminating with ~p, state ~2048p",
@@ -302,7 +305,7 @@ terminate(Reason, Srv) ->
if Db == undefined -> ok; true ->
couch_util:shutdown_sync(couch_db:get_pid(Db))
end
- end, nil, couch_dbs),
+ end, nil, name("couch_dbs", Srv#server.num)),
ok.
handle_config_change("couchdb", "database_dir", _, _, _) ->
@@ -425,7 +428,7 @@ open_async(Server, From, DbName, Options) ->
true -> create;
false -> open
end,
- true = ets:insert(couch_dbs, #entry{
+ true = ets:insert(name("couch_dbs", Server#server.num), #entry{
name = DbName,
pid = Opener,
lock = locked,
@@ -433,7 +436,7 @@ open_async(Server, From, DbName, Options) ->
req_type = ReqType,
db_options = Options
}),
- true = ets:insert(couch_dbs_pid_to_name, {Opener, DbName}),
+ true = ets:insert(name("couch_dbs_pid_to_name", Server#server.num), {Opener, DbName}),
db_opened(Server, Options).
open_async_int(Server, DbName, Options) ->
@@ -468,9 +471,9 @@ handle_call(reload_engines, _From, Server) ->
handle_call(get_server, _From, Server) ->
{reply, {ok, Server}, Server};
handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
- true = ets:delete(couch_dbs_pid_to_name, Opener),
+ true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Opener),
DbPid = couch_db:get_pid(Db),
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
[] ->
% db was deleted during async open
exit(DbPid, kill),
@@ -485,7 +488,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
_ ->
ok
end,
- true = ets:insert(couch_dbs, #entry{
+ true = ets:insert(name("couch_dbs", Server#server.num), #entry{
name = DbName,
db = Db,
pid = DbPid,
@@ -493,7 +496,7 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
db_options = Entry#entry.db_options,
start_time = couch_db:get_instance_start_time(Db)
}),
- true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}),
+ true = ets:insert(name("couch_dbs_pid_to_name", Server#server.num), {DbPid, DbName}),
Lru = case couch_db:is_system_db(Db) of
false ->
couch_lru:insert(DbName, Server#server.lru);
@@ -511,14 +514,14 @@ handle_call({open_result, DbName, {ok, Db}}, {Opener, _}, Server) ->
handle_call({open_result, DbName, {error, eexist}}, From, Server) ->
handle_call({open_result, DbName, file_exists}, From, Server);
handle_call({open_result, DbName, Error}, {Opener, _}, Server) ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
[] ->
% db was deleted during async open
{reply, ok, Server};
[#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] ->
[gen_server:reply(Waiter, Error) || Waiter <- Waiters],
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, Opener),
+ true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+ true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Opener),
NewServer = case ReqType of
{create, DbName, Options, CrFrom} ->
open_async(Server, CrFrom, DbName, Options);
@@ -532,7 +535,7 @@ handle_call({open_result, DbName, Error}, {Opener, _}, Server) ->
{reply, ok, Server}
end;
handle_call({open, DbName, Options}, From, Server) ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
[] ->
case make_room(Server, Options) of
{ok, Server2} ->
@@ -541,7 +544,7 @@ handle_call({open, DbName, Options}, From, Server) ->
{reply, CloseError, Server}
end;
[#entry{waiters = Waiters} = Entry] when is_list(Waiters) ->
- true = ets:insert(couch_dbs, Entry#entry{waiters = [From | Waiters]}),
+ true = ets:insert(name("couch_dbs", Server#server.num), Entry#entry{waiters = [From | Waiters]}),
NumWaiters = length(Waiters),
if NumWaiters =< 10 orelse NumWaiters rem 10 /= 0 -> ok; true ->
Fmt = "~b clients waiting to open db ~s",
@@ -552,7 +555,7 @@ handle_call({open, DbName, Options}, From, Server) ->
{reply, {ok, Db}, Server}
end;
handle_call({create, DbName, Options}, From, Server) ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
[] ->
case make_room(Server, Options) of
{ok, Server2} ->
@@ -567,7 +570,7 @@ handle_call({create, DbName, Options}, From, Server) ->
% to wait while we figure out if it'll succeed.
CrOptions = [create | Options],
Req = {create, DbName, CrOptions, From},
- true = ets:insert(couch_dbs, Entry#entry{req_type = Req}),
+ true = ets:insert(name("couch_dbs", Server#server.num), Entry#entry{req_type = Req}),
{noreply, Server};
[_AlreadyRunningDb] ->
{reply, file_exists, Server}
@@ -577,17 +580,17 @@ handle_call({delete, DbName, Options}, _From, Server) ->
case check_dbname(DbNameList) of
ok ->
Server2 =
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(name("couch_dbs", Server#server.num), DbName) of
[] -> Server;
[#entry{pid = Pid, waiters = Waiters} = Entry] when is_list(Waiters) ->
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, Pid),
+ true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+ true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
exit(Pid, kill),
[gen_server:reply(Waiter, not_found) || Waiter <- Waiters],
db_closed(Server, Entry#entry.db_options);
[#entry{pid = Pid} = Entry] ->
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, Pid),
+ true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+ true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
exit(Pid, kill),
db_closed(Server, Entry#entry.db_options)
end,
@@ -616,9 +619,9 @@ handle_call({delete, DbName, Options}, _From, Server) ->
handle_call({db_updated, Db}, _From, Server0) ->
DbName = couch_db:name(Db),
StartTime = couch_db:get_instance_start_time(Db),
- Server = try ets:lookup_element(couch_dbs, DbName, #entry.start_time) of
+ Server = try ets:lookup_element(name("couch_dbs", Server0#server.num), DbName, #entry.start_time) of
StartTime ->
- true = ets:update_element(couch_dbs, DbName, {#entry.db, Db}),
+ true = ets:update_element(name("couch_dbs", Server0#server.num), DbName, {#entry.db, Db}),
Lru = case couch_db:is_system_db(Db) of
false -> couch_lru:update(DbName, Server0#server.lru);
true -> Server0#server.lru
@@ -636,19 +639,19 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} =
handle_cast({update_lru, _DbName}, Server) ->
{noreply, Server};
handle_cast({close_db_if_idle, DbName}, Server) ->
- case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
+ case ets:update_element(name("couch_dbs", Server#server.num), DbName, {#entry.lock, locked}) of
true ->
- [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs, DbName),
+ [#entry{db = Db, db_options = DbOpts}] = ets:lookup(name("couch_dbs", Server#server.num), DbName),
case couch_db:is_idle(Db) of
true ->
DbPid = couch_db:get_pid(Db),
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, DbPid),
+ true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+ true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), DbPid),
exit(DbPid, kill),
{noreply, db_closed(Server, DbOpts)};
false ->
true = ets:update_element(
- couch_dbs, DbName, {#entry.lock, unlocked}),
+ name("couch_dbs", Server#server.num), DbName, {#entry.lock, unlocked}),
{noreply, Server}
end;
false ->
@@ -664,9 +667,9 @@ code_change(_OldVsn, #server{}=State, _Extra) ->
handle_info({'EXIT', _Pid, config_change}, Server) ->
{stop, config_change, Server};
handle_info({'EXIT', Pid, Reason}, Server) ->
- case ets:lookup(couch_dbs_pid_to_name, Pid) of
+ case ets:lookup(name("couch_dbs_pid_to_name", Server#server.num), Pid) of
[{Pid, DbName}] ->
- [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs, DbName),
+ [#entry{waiters = Waiters} = Entry] = ets:lookup(name("couch_dbs", Server#server.num), DbName),
if Reason /= snappy_nif_not_loaded -> ok; true ->
Msg = io_lib:format("To open the database `~s`, Apache CouchDB "
"must be built with Erlang OTP R13B04 or higher.", [DbName]),
@@ -681,8 +684,8 @@ handle_info({'EXIT', Pid, Reason}, Server) ->
if not is_list(Waiters) -> ok; true ->
[gen_server:reply(Waiter, Reason) || Waiter <- Waiters]
end,
- true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, Pid),
+ true = ets:delete(name("couch_dbs", Server#server.num), DbName),
+ true = ets:delete(name("couch_dbs_pid_to_name", Server#server.num), Pid),
{noreply, db_closed(Server, Entry#entry.db_options)};
[] ->
{noreply, Server}
@@ -720,7 +723,7 @@ validate_open_or_create(DbName, Options) ->
throw({?MODULE, EngineError})
end,
- case ets:lookup(couch_dbs_locks, DbName) of
+ case ets:lookup(name("couch_dbs_locks", num_for_db(DbName)), DbName) of
[] ->
ok;
[{DbName, Reason}] ->
@@ -861,21 +864,31 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) ->
end.
lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) ->
- case ets:lookup(couch_dbs, DbName) of
+ case ets:lookup(name("couch_dbs", num_for_db(DbName)), DbName) of
[] ->
- true = ets:insert(couch_dbs_locks, {DbName, Reason}),
+ true = ets:insert(name("couch_dbs_locks", num_for_db(DbName)), {DbName, Reason}),
ok;
[#entry{}] ->
{error, already_opened}
end.
unlock(DbName) when is_binary(DbName) ->
- true = ets:delete(couch_dbs_locks, DbName),
+ true = ets:delete(name("couch_dbs_locks", num_for_db(DbName)), DbName),
ok.
db_updated(Db) ->
- gen_server:call(couch_server, {db_updated, Db}, infinity).
+ DbName = couch_db:name(Db),
+ gen_server:call(name("couch_server", num_for_db(DbName)), {db_updated, Db}, infinity).
+
+
+name(Name, N) when is_integer(N), N >= 0 ->
+ list_to_atom(Name ++ "_" ++ integer_to_list(N)).
+
+
+num_for_db(DbName) when is_binary(DbName) ->
+ N = application:get_env(couch, concurrency, 1),
+ 1 + (erlang:crc32(DbName) rem N).
-ifdef(TEST).