You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2019/02/28 18:50:44 UTC
[couchdb] 01/01: WIP: IOQ2 per shard/user
This is an automated email from the ASF dual-hosted git repository.
chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 1644b1dfc93106792631a7688ed5bd413dddd03b
Author: Russell Branca <ch...@apache.org>
AuthorDate: Thu Feb 28 18:45:15 2019 +0000
WIP: IOQ2 per shard/user
---
Makefile | 2 +-
src/couch/src/couch_bt_engine.erl | 5 ++
src/couch/src/couch_bt_engine_compactor.erl | 10 ++++
src/couch/src/couch_db.erl | 7 ++-
src/couch/src/couch_db_engine.erl | 6 +++
src/couch/src/couch_db_updater.erl | 10 ++--
src/couch/src/couch_server.erl | 59 ++++++++++++++--------
src/couch/src/couch_server_int.hrl | 1 +
src/couch/test/couch_server_tests.erl | 2 +-
src/couch_mrview/src/couch_mrview_updater.erl | 12 ++++-
src/couch_pse_tests/src/cpse_test_ref_counting.erl | 9 ++--
src/fabric/src/fabric_rpc.erl | 4 ++
12 files changed, 94 insertions(+), 33 deletions(-)
diff --git a/Makefile b/Makefile
index 208d169..74b925b 100644
--- a/Makefile
+++ b/Makefile
@@ -165,7 +165,7 @@ eunit: export COUCHDB_QUERY_SERVER_JAVASCRIPT = $(shell pwd)/bin/couchjs $(shell
eunit: couch
@$(REBAR) setup_eunit 2> /dev/null
@for dir in $(subdirs); do \
- $(REBAR) -r eunit $(EUNIT_OPTS) apps=$$dir; \
+ $(REBAR) -r eunit $(EUNIT_OPTS) apps=$$dir || exit 1; \
done
setup-eunit: export BUILDDIR = $(shell pwd)
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 7b33c42..d18afb9 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -45,6 +45,7 @@
get_partition_info/2,
get_update_seq/1,
get_uuid/1,
+ get_fd_pid/1,
set_revs_limit/2,
set_purge_infos_limit/2,
@@ -348,6 +349,10 @@ get_uuid(#st{header = Header}) ->
couch_bt_engine_header:get(Header, uuid).
+get_fd_pid(#st{fd = Fd}) ->
+ Fd.
+
+
set_revs_limit(#st{header = Header} = St, RevsLimit) ->
NewSt = St#st{
header = couch_bt_engine_header:set(Header, [
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 737f772..ed14348 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -39,6 +39,7 @@
start(#st{} = St, DbName, Options, Parent) ->
erlang:put(io_priority, {db_compact, DbName}),
#st{
+ fd = FdPid,
filepath = FilePath,
header = Header
} = St,
@@ -46,6 +47,15 @@ start(#st{} = St, DbName, Options, Parent) ->
couch_db_engine:trigger_on_compact(DbName),
+ IOQPid = case lists:member(sys_db, Options) of
+ false ->
+ Ctx = couch_util:get_value(user_ctx, Options, undefined),
+ ioq:fetch_pid_for(DbName, Ctx, FdPid);
+ true ->
+ undefined
+ end,
+ ok = ioq:set_pid_for(FdPid, IOQPid),
+
{ok, NewSt, DName, DFd, MFd, Retry} =
open_compaction_files(Header, FilePath, Options),
erlang:monitor(process, MFd),
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 74f4a09..56ad6b2 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -45,6 +45,7 @@
get_filepath/1,
get_instance_start_time/1,
get_pid/1,
+ get_fd_pid/1,
get_revs_limit/1,
get_security/1,
get_update_seq/1,
@@ -249,7 +250,8 @@ monitored_by(Db) ->
case couch_db_engine:monitored_by(Db) of
Pids when is_list(Pids) ->
PidTracker = whereis(couch_stats_process_tracker),
- Pids -- [Db#db.main_pid, PidTracker];
+ IOQOpener = whereis(ioq_opener),
+ Pids -- [Db#db.main_pid, PidTracker, IOQOpener];
undefined ->
[]
end.
@@ -556,6 +558,9 @@ get_purge_infos_limit(#db{}=Db) ->
get_pid(#db{main_pid = Pid}) ->
Pid.
+get_fd_pid(#db{}=Db) ->
+ couch_db_engine:get_fd_pid(Db).
+
get_del_doc_count(Db) ->
{ok, couch_db_engine:get_del_doc_count(Db)}.
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 91d35b0..dd66b62 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -706,6 +706,7 @@
get_partition_info/2,
get_update_seq/1,
get_uuid/1,
+ get_fd_pid/1,
set_revs_limit/2,
set_security/2,
@@ -894,6 +895,11 @@ get_uuid(#db{} = Db) ->
Engine:get_uuid(EngineState).
+get_fd_pid(#db{} = Db) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ Engine:get_fd_pid(EngineState).
+
+
set_revs_limit(#db{} = Db, RevsLimit) ->
#db{engine = {Engine, EngineState}} = Db,
{ok, NewSt} = Engine:set_revs_limit(EngineState, RevsLimit),
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 4227ff0..268abdb 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -42,12 +42,16 @@ init({Engine, DbName, FilePath, Options0}) ->
try
{ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options),
Db = init_db(DbName, FilePath, EngineState, Options),
- case lists:member(sys_db, Options) of
+ Ctx = couch_util:get_value(user_ctx, Options, undefined),
+ FdPid = couch_db:get_fd_pid(Db),
+ IOQPid = case lists:member(sys_db, Options) of
false ->
- couch_stats_process_tracker:track([couchdb, open_databases]);
+ couch_stats_process_tracker:track([couchdb, open_databases]),
+ ioq:fetch_pid_for(DbName, Ctx, FdPid);
true ->
- ok
+ undefined
end,
+ ok = ioq:set_pid_for(FdPid, IOQPid),
% Don't load validation funs here because the fabric query is
% liable to race conditions. Instead see
% couch_db:validate_doc_update, which loads them lazily.
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 3ceab3a..403ab94 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -80,17 +80,21 @@ sup_start_link() ->
open(DbName, Options0) ->
Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
case ets:lookup(couch_dbs, DbName) of
- [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
+ [#entry{db = Db0, lock = Lock, ioq_pid=IOQPid} = Entry] when Lock =/= locked ->
update_lru(DbName, Entry#entry.db_options),
{ok, Db1} = couch_db:incref(Db0),
+ FdPid = couch_db:get_fd_pid(Db1),
+ ok = ioq:set_pid_for(FdPid, IOQPid),
couch_db:set_user_ctx(Db1, Ctx);
_ ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
Timeout = couch_util:get_value(timeout, Options, infinity),
Create = couch_util:get_value(create_if_missing, Options, false),
case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of
- {ok, Db0} ->
+ {ok, #entry{db=Db0, ioq_pid=IOQPid}} ->
{ok, Db1} = couch_db:incref(Db0),
+ FdPid = couch_db:get_fd_pid(Db1),
+ ok = ioq:set_pid_for(FdPid, IOQPid),
couch_db:set_user_ctx(Db1, Ctx);
{not_found, no_db_file} when Create ->
couch_log:warning("creating missing database: ~s", [DbName]),
@@ -118,7 +122,7 @@ create(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
couch_partition:validate_dbname(DbName, Options),
case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
- {ok, Db0} ->
+ {ok, #entry{db=Db0}} ->
Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
{ok, Db1} = couch_db:incref(Db0),
couch_db:set_user_ctx(Db1, Ctx);
@@ -364,13 +368,22 @@ open_async(Server, From, DbName, {Module, Filepath}, Options) ->
T0 = os:timestamp(),
Opener = spawn_link(fun() ->
Res = couch_db:start_link(Module, DbName, Filepath, Options),
- case {Res, lists:member(create, Options)} of
- {{ok, _Db}, true} ->
- couch_event:notify(DbName, created);
+ IOQPid = case Res of
+ {ok, Db} ->
+ case lists:member(create, Options) of
+ true -> couch_event:notify(DbName, created);
+ false -> ok
+ end,
+ Ctx = couch_util:get_value(user_ctx, Options, undefined),
+ FdPid = couch_db:get_fd_pid(Db),
+ case couch_db:is_system_db(Db) of
+ true -> undefined; %% use default IOQ pid for system dbs
+ false -> ioq:fetch_pid_for(DbName, Ctx, FdPid)
+ end;
_ ->
- ok
+ undefined
end,
- gen_server:call(Parent, {open_result, T0, DbName, Res}, infinity),
+ gen_server:call(Parent, {open_result, T0, DbName, Res, IOQPid}, infinity),
unlink(Parent)
end),
ReqType = case lists:member(create, Options) of
@@ -405,7 +418,7 @@ handle_call(reload_engines, _From, Server) ->
{reply, ok, Server#server{engines = get_configured_engines()}};
handle_call(get_server, _From, Server) ->
{reply, {ok, Server}, Server};
-handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
+handle_call({open_result, T0, DbName, {ok, Db}, IOQPid}, {Opener, _}, Server) ->
true = ets:delete(couch_dbs_pid_to_name, Opener),
OpenTime = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, db_open_time], OpenTime),
@@ -415,9 +428,18 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
% db was deleted during async open
exit(DbPid, kill),
{reply, ok, Server};
- [#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] ->
+ [#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry0] ->
link(DbPid),
- [gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters],
+ Entry = #entry{
+ name = DbName,
+ db = Db,
+ pid = DbPid,
+ ioq_pid = IOQPid,
+ lock = unlocked,
+ db_options = Entry0#entry.db_options,
+ start_time = couch_db:get_instance_start_time(Db)
+ },
+ [gen_server:reply(Waiter, {ok, Entry}) || Waiter <- Waiters],
% Cancel the creation request if it exists.
case ReqType of
{create, DbName, _Engine, _Options, CrFrom} ->
@@ -425,14 +447,7 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
_ ->
ok
end,
- true = ets:insert(couch_dbs, #entry{
- name = DbName,
- db = Db,
- pid = DbPid,
- lock = unlocked,
- db_options = Entry#entry.db_options,
- start_time = couch_db:get_instance_start_time(Db)
- }),
+ true = ets:insert(couch_dbs, Entry),
true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}),
Lru = case couch_db:is_system_db(Db) of
false ->
@@ -448,9 +463,9 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
exit(couch_db:get_pid(Db), kill),
{reply, ok, Server}
end;
-handle_call({open_result, T0, DbName, {error, eexist}}, From, Server) ->
- handle_call({open_result, T0, DbName, file_exists}, From, Server);
-handle_call({open_result, _T0, DbName, Error}, {Opener, _}, Server) ->
+handle_call({open_result, T0, DbName, {error, eexist}, IOQPid}, From, Server) ->
+ handle_call({open_result, T0, DbName, file_exists, IOQPid}, From, Server);
+handle_call({open_result, _T0, DbName, Error, _}, {Opener, _}, Server) ->
case ets:lookup(couch_dbs, DbName) of
[] ->
% db was deleted during async open
diff --git a/src/couch/src/couch_server_int.hrl b/src/couch/src/couch_server_int.hrl
index 537a6ab..86f93d3 100644
--- a/src/couch/src/couch_server_int.hrl
+++ b/src/couch/src/couch_server_int.hrl
@@ -15,6 +15,7 @@
name,
db,
pid,
+ ioq_pid,
lock,
waiters,
req_type,
diff --git a/src/couch/test/couch_server_tests.erl b/src/couch/test/couch_server_tests.erl
index fdbc175..d46b8e1 100644
--- a/src/couch/test/couch_server_tests.erl
+++ b/src/couch/test/couch_server_tests.erl
@@ -251,7 +251,7 @@ wait_for_open_async_result(CouchServer, Opener) ->
{_, Messages} = erlang:process_info(CouchServer, messages),
Found = lists:foldl(fun(Msg, Acc) ->
case Msg of
- {'$gen_call', {Opener, _}, {open_result, _, _, {ok, _}}} ->
+ {'$gen_call', {Opener, _}, {open_result, _, _, {ok, _}, _}} ->
true;
_ ->
Acc
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 9740e6a..dcc70e4 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -36,7 +36,14 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
Self = self(),
+ #mrst{
+ id_btree=#btree{fd=Fd},
+ db_name=DbName
+ } = InitState,
+ IOQPid = ioq:fetch_pid_for(DbName, undefined, Fd),
+
MapFun = fun() ->
+ ok = ioq:set_pid_for(Fd, IOQPid),
Progress = case NumChanges of
0 -> 0;
_ -> (NumChangesDone * 100) div NumChanges
@@ -53,7 +60,10 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
couch_task_status:set_update_frequency(500),
map_docs(Self, InitState)
end,
- WriteFun = fun() -> write_results(Self, InitState) end,
+ WriteFun = fun() ->
+ ok = ioq:set_pid_for(Fd, IOQPid),
+ write_results(Self, InitState)
+ end,
spawn_link(MapFun),
spawn_link(WriteFun),
diff --git a/src/couch_pse_tests/src/cpse_test_ref_counting.erl b/src/couch_pse_tests/src/cpse_test_ref_counting.erl
index cb115a7..7b5981f 100644
--- a/src/couch_pse_tests/src/cpse_test_ref_counting.erl
+++ b/src/couch_pse_tests/src/cpse_test_ref_counting.erl
@@ -37,7 +37,8 @@ cpse_empty_monitors({Db, Pid}) ->
Expected = [
Pid,
couch_db:get_pid(Db),
- whereis(couch_stats_process_tracker)
+ whereis(couch_stats_process_tracker),
+ whereis(ioq_opener)
],
?assertEqual([], Pids -- Expected).
@@ -63,13 +64,13 @@ cpse_incref_decref_many({Db, _}) ->
lists:foreach(fun(C) -> wait_client(C) end, Clients),
Pids1 = couch_db_engine:monitored_by(Db),
- % +3 for self, db pid, and process tracker
- ?assertEqual(?NUM_CLIENTS + 3, length(Pids1)),
+ % +4 for self, db pid, process tracker, and ioq opener
+ ?assertEqual(?NUM_CLIENTS + 4, length(Pids1)),
lists:foreach(fun(C) -> close_client(C) end, Clients),
Pids2 = couch_db_engine:monitored_by(Db),
- ?assertEqual(3, length(Pids2)).
+ ?assertEqual(4, length(Pids2)).
start_client(Db0) ->
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 97374be..addc722 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -601,6 +601,10 @@ clean_stack() ->
erlang:get_stacktrace()).
set_io_priority(DbName, Options) ->
+ %% HACK: experiment with spawning IOQ2 pids prior to couch_file open
+ %%Ctx = couch_util:get_value(user_ctx, Options, undefined),
+ %%IOQPid = ioq:fetch_pid_for(DbName, Ctx),
+ %%ok = ioq:set_pid_for(DbName, IOQPid),
case lists:keyfind(io_priority, 1, Options) of
{io_priority, Pri} ->
erlang:put(io_priority, Pri);