You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2019/02/28 18:50:44 UTC

[couchdb] 01/01: WIP: IOQ2 per shard/user

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 1644b1dfc93106792631a7688ed5bd413dddd03b
Author: Russell Branca <ch...@apache.org>
AuthorDate: Thu Feb 28 18:45:15 2019 +0000

    WIP: IOQ2 per shard/user
---
 Makefile                                           |  2 +-
 src/couch/src/couch_bt_engine.erl                  |  5 ++
 src/couch/src/couch_bt_engine_compactor.erl        | 10 ++++
 src/couch/src/couch_db.erl                         |  7 ++-
 src/couch/src/couch_db_engine.erl                  |  6 +++
 src/couch/src/couch_db_updater.erl                 | 10 ++--
 src/couch/src/couch_server.erl                     | 59 ++++++++++++++--------
 src/couch/src/couch_server_int.hrl                 |  1 +
 src/couch/test/couch_server_tests.erl              |  2 +-
 src/couch_mrview/src/couch_mrview_updater.erl      | 12 ++++-
 src/couch_pse_tests/src/cpse_test_ref_counting.erl |  9 ++--
 src/fabric/src/fabric_rpc.erl                      |  4 ++
 12 files changed, 94 insertions(+), 33 deletions(-)

diff --git a/Makefile b/Makefile
index 208d169..74b925b 100644
--- a/Makefile
+++ b/Makefile
@@ -165,7 +165,7 @@ eunit: export COUCHDB_QUERY_SERVER_JAVASCRIPT = $(shell pwd)/bin/couchjs $(shell
 eunit: couch
 	@$(REBAR) setup_eunit 2> /dev/null
 	@for dir in $(subdirs); do \
-	  $(REBAR) -r eunit $(EUNIT_OPTS) apps=$$dir; \
+	  $(REBAR) -r eunit $(EUNIT_OPTS) apps=$$dir || exit 1; \
 	done
 
 setup-eunit: export BUILDDIR = $(shell pwd)
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 7b33c42..d18afb9 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -45,6 +45,7 @@
     get_partition_info/2,
     get_update_seq/1,
     get_uuid/1,
+    get_fd_pid/1,
 
     set_revs_limit/2,
     set_purge_infos_limit/2,
@@ -348,6 +349,10 @@ get_uuid(#st{header = Header}) ->
     couch_bt_engine_header:get(Header, uuid).
 
 
+get_fd_pid(#st{fd = Fd}) ->
+    Fd.
+
+
 set_revs_limit(#st{header = Header} = St, RevsLimit) ->
     NewSt = St#st{
         header = couch_bt_engine_header:set(Header, [
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 737f772..ed14348 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -39,6 +39,7 @@
 start(#st{} = St, DbName, Options, Parent) ->
     erlang:put(io_priority, {db_compact, DbName}),
     #st{
+        fd = FdPid,
         filepath = FilePath,
         header = Header
     } = St,
@@ -46,6 +47,15 @@ start(#st{} = St, DbName, Options, Parent) ->
 
     couch_db_engine:trigger_on_compact(DbName),
 
+    IOQPid = case lists:member(sys_db, Options) of
+        false ->
+            Ctx = couch_util:get_value(user_ctx, Options, undefined),
+            ioq:fetch_pid_for(DbName, Ctx, FdPid);
+        true ->
+            undefined
+    end,
+    ok = ioq:set_pid_for(FdPid, IOQPid),
+
     {ok, NewSt, DName, DFd, MFd, Retry} =
             open_compaction_files(Header, FilePath, Options),
     erlang:monitor(process, MFd),
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 74f4a09..56ad6b2 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -45,6 +45,7 @@
     get_filepath/1,
     get_instance_start_time/1,
     get_pid/1,
+    get_fd_pid/1,
     get_revs_limit/1,
     get_security/1,
     get_update_seq/1,
@@ -249,7 +250,8 @@ monitored_by(Db) ->
     case couch_db_engine:monitored_by(Db) of
         Pids when is_list(Pids) ->
             PidTracker = whereis(couch_stats_process_tracker),
-            Pids -- [Db#db.main_pid, PidTracker];
+            IOQOpener = whereis(ioq_opener),
+            Pids -- [Db#db.main_pid, PidTracker, IOQOpener];
         undefined ->
             []
     end.
@@ -556,6 +558,9 @@ get_purge_infos_limit(#db{}=Db) ->
 get_pid(#db{main_pid = Pid}) ->
     Pid.
 
+get_fd_pid(#db{}=Db) ->
+    couch_db_engine:get_fd_pid(Db).
+
 get_del_doc_count(Db) ->
     {ok, couch_db_engine:get_del_doc_count(Db)}.
 
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 91d35b0..dd66b62 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -706,6 +706,7 @@
     get_partition_info/2,
     get_update_seq/1,
     get_uuid/1,
+    get_fd_pid/1,
 
     set_revs_limit/2,
     set_security/2,
@@ -894,6 +895,11 @@ get_uuid(#db{} = Db) ->
     Engine:get_uuid(EngineState).
 
 
+get_fd_pid(#db{} = Db) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:get_fd_pid(EngineState).
+
+
 set_revs_limit(#db{} = Db, RevsLimit) ->
     #db{engine = {Engine, EngineState}} = Db,
     {ok, NewSt} = Engine:set_revs_limit(EngineState, RevsLimit),
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 4227ff0..268abdb 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -42,12 +42,16 @@ init({Engine, DbName, FilePath, Options0}) ->
     try
         {ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options),
         Db = init_db(DbName, FilePath, EngineState, Options),
-        case lists:member(sys_db, Options) of
+        Ctx = couch_util:get_value(user_ctx, Options, undefined),
+        FdPid = couch_db:get_fd_pid(Db),
+        IOQPid = case lists:member(sys_db, Options) of
             false ->
-                couch_stats_process_tracker:track([couchdb, open_databases]);
+                couch_stats_process_tracker:track([couchdb, open_databases]),
+                ioq:fetch_pid_for(DbName, Ctx, FdPid);
             true ->
-                ok
+                undefined
         end,
+        ok = ioq:set_pid_for(FdPid, IOQPid),
         % Don't load validation funs here because the fabric query is
         % liable to race conditions. Instead see
         % couch_db:validate_doc_update, which loads them lazily.
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 3ceab3a..403ab94 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -80,17 +80,21 @@ sup_start_link() ->
 open(DbName, Options0) ->
     Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
     case ets:lookup(couch_dbs, DbName) of
-    [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
+    [#entry{db = Db0, lock = Lock, ioq_pid=IOQPid} = Entry] when Lock =/= locked ->
         update_lru(DbName, Entry#entry.db_options),
         {ok, Db1} = couch_db:incref(Db0),
+        FdPid = couch_db:get_fd_pid(Db1),
+        ok = ioq:set_pid_for(FdPid, IOQPid),
         couch_db:set_user_ctx(Db1, Ctx);
     _ ->
         Options = maybe_add_sys_db_callbacks(DbName, Options0),
         Timeout = couch_util:get_value(timeout, Options, infinity),
         Create = couch_util:get_value(create_if_missing, Options, false),
         case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of
-        {ok, Db0} ->
+        {ok, #entry{db=Db0, ioq_pid=IOQPid}} ->
             {ok, Db1} = couch_db:incref(Db0),
+            FdPid = couch_db:get_fd_pid(Db1),
+            ok = ioq:set_pid_for(FdPid, IOQPid),
             couch_db:set_user_ctx(Db1, Ctx);
         {not_found, no_db_file} when Create ->
             couch_log:warning("creating missing database: ~s", [DbName]),
@@ -118,7 +122,7 @@ create(DbName, Options0) ->
     Options = maybe_add_sys_db_callbacks(DbName, Options0),
     couch_partition:validate_dbname(DbName, Options),
     case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
-    {ok, Db0} ->
+    {ok, #entry{db=Db0}} ->
         Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
         {ok, Db1} = couch_db:incref(Db0),
         couch_db:set_user_ctx(Db1, Ctx);
@@ -364,13 +368,22 @@ open_async(Server, From, DbName, {Module, Filepath}, Options) ->
     T0 = os:timestamp(),
     Opener = spawn_link(fun() ->
         Res = couch_db:start_link(Module, DbName, Filepath, Options),
-        case {Res, lists:member(create, Options)} of
-            {{ok, _Db}, true} ->
-                couch_event:notify(DbName, created);
+        IOQPid = case Res of
+            {ok, Db} ->
+                case lists:member(create, Options) of
+                    true -> couch_event:notify(DbName, created);
+                    false -> ok
+                end,
+                Ctx = couch_util:get_value(user_ctx, Options, undefined),
+                FdPid = couch_db:get_fd_pid(Db),
+                case couch_db:is_system_db(Db) of
+                    true -> undefined; %% use default IOQ pid for system dbs
+                    false -> ioq:fetch_pid_for(DbName, Ctx, FdPid)
+                end;
             _ ->
-                ok
+                undefined
         end,
-        gen_server:call(Parent, {open_result, T0, DbName, Res}, infinity),
+        gen_server:call(Parent, {open_result, T0, DbName, Res, IOQPid}, infinity),
         unlink(Parent)
     end),
     ReqType = case lists:member(create, Options) of
@@ -405,7 +418,7 @@ handle_call(reload_engines, _From, Server) ->
     {reply, ok, Server#server{engines = get_configured_engines()}};
 handle_call(get_server, _From, Server) ->
     {reply, {ok, Server}, Server};
-handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
+handle_call({open_result, T0, DbName, {ok, Db}, IOQPid}, {Opener, _}, Server) ->
     true = ets:delete(couch_dbs_pid_to_name, Opener),
     OpenTime = timer:now_diff(os:timestamp(), T0) / 1000,
     couch_stats:update_histogram([couchdb, db_open_time], OpenTime),
@@ -415,9 +428,18 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
             % db was deleted during async open
             exit(DbPid, kill),
             {reply, ok, Server};
-        [#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] ->
+        [#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry0] ->
             link(DbPid),
-            [gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters],
+            Entry = #entry{
+                name = DbName,
+                db = Db,
+                pid = DbPid,
+                ioq_pid = IOQPid,
+                lock = unlocked,
+                db_options = Entry0#entry.db_options,
+                start_time = couch_db:get_instance_start_time(Db)
+            },
+            [gen_server:reply(Waiter, {ok, Entry}) || Waiter <- Waiters],
             % Cancel the creation request if it exists.
             case ReqType of
                 {create, DbName, _Engine, _Options, CrFrom} ->
@@ -425,14 +447,7 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
                 _ ->
                     ok
             end,
-            true = ets:insert(couch_dbs, #entry{
-                name = DbName,
-                db = Db,
-                pid = DbPid,
-                lock = unlocked,
-                db_options = Entry#entry.db_options,
-                start_time = couch_db:get_instance_start_time(Db)
-            }),
+            true = ets:insert(couch_dbs, Entry),
             true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}),
             Lru = case couch_db:is_system_db(Db) of
                 false ->
@@ -448,9 +463,9 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
             exit(couch_db:get_pid(Db), kill),
             {reply, ok, Server}
     end;
-handle_call({open_result, T0, DbName, {error, eexist}}, From, Server) ->
-    handle_call({open_result, T0, DbName, file_exists}, From, Server);
-handle_call({open_result, _T0, DbName, Error}, {Opener, _}, Server) ->
+handle_call({open_result, T0, DbName, {error, eexist}, IOQPid}, From, Server) ->
+    handle_call({open_result, T0, DbName, file_exists, IOQPid}, From, Server);
+handle_call({open_result, _T0, DbName, Error, _}, {Opener, _}, Server) ->
     case ets:lookup(couch_dbs, DbName) of
         [] ->
             % db was deleted during async open
diff --git a/src/couch/src/couch_server_int.hrl b/src/couch/src/couch_server_int.hrl
index 537a6ab..86f93d3 100644
--- a/src/couch/src/couch_server_int.hrl
+++ b/src/couch/src/couch_server_int.hrl
@@ -15,6 +15,7 @@
     name,
     db,
     pid,
+    ioq_pid,
     lock,
     waiters,
     req_type,
diff --git a/src/couch/test/couch_server_tests.erl b/src/couch/test/couch_server_tests.erl
index fdbc175..d46b8e1 100644
--- a/src/couch/test/couch_server_tests.erl
+++ b/src/couch/test/couch_server_tests.erl
@@ -251,7 +251,7 @@ wait_for_open_async_result(CouchServer, Opener) ->
         {_, Messages} = erlang:process_info(CouchServer, messages),
         Found = lists:foldl(fun(Msg, Acc) ->
             case Msg of
-                {'$gen_call', {Opener, _}, {open_result, _, _, {ok, _}}} ->
+                {'$gen_call', {Opener, _}, {open_result, _, _, {ok, _}, _}} ->
                     true;
                 _ ->
                     Acc
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 9740e6a..dcc70e4 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -36,7 +36,14 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
 
     Self = self(),
 
+    #mrst{
+        id_btree=#btree{fd=Fd},
+        db_name=DbName
+    } = InitState,
+    IOQPid = ioq:fetch_pid_for(DbName, undefined, Fd),
+
     MapFun = fun() ->
+        ok = ioq:set_pid_for(Fd, IOQPid),
         Progress = case NumChanges of
             0 -> 0;
             _ -> (NumChangesDone * 100) div NumChanges
@@ -53,7 +60,10 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
         couch_task_status:set_update_frequency(500),
         map_docs(Self, InitState)
     end,
-    WriteFun = fun() -> write_results(Self, InitState) end,
+    WriteFun = fun() ->
+        ok = ioq:set_pid_for(Fd, IOQPid),
+        write_results(Self, InitState)
+    end,
 
     spawn_link(MapFun),
     spawn_link(WriteFun),
diff --git a/src/couch_pse_tests/src/cpse_test_ref_counting.erl b/src/couch_pse_tests/src/cpse_test_ref_counting.erl
index cb115a7..7b5981f 100644
--- a/src/couch_pse_tests/src/cpse_test_ref_counting.erl
+++ b/src/couch_pse_tests/src/cpse_test_ref_counting.erl
@@ -37,7 +37,8 @@ cpse_empty_monitors({Db, Pid}) ->
     Expected = [
         Pid,
         couch_db:get_pid(Db),
-        whereis(couch_stats_process_tracker)
+        whereis(couch_stats_process_tracker),
+        whereis(ioq_opener)
     ],
     ?assertEqual([], Pids -- Expected).
 
@@ -63,13 +64,13 @@ cpse_incref_decref_many({Db, _}) ->
     lists:foreach(fun(C) -> wait_client(C) end, Clients),
 
     Pids1 = couch_db_engine:monitored_by(Db),
-    % +3 for self, db pid, and process tracker
-    ?assertEqual(?NUM_CLIENTS + 3, length(Pids1)),
+    % +4 for self, db pid, process tracker, and ioq opener
+    ?assertEqual(?NUM_CLIENTS + 4, length(Pids1)),
 
     lists:foreach(fun(C) -> close_client(C) end, Clients),
 
     Pids2 = couch_db_engine:monitored_by(Db),
-    ?assertEqual(3, length(Pids2)).
+    ?assertEqual(4, length(Pids2)).
 
 
 start_client(Db0) ->
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 97374be..addc722 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -601,6 +601,10 @@ clean_stack() ->
         erlang:get_stacktrace()).
 
 set_io_priority(DbName, Options) ->
+    %% HACK: experiment with spawning IOQ2 pids prior to couch_file open
+    %%Ctx = couch_util:get_value(user_ctx, Options, undefined),
+    %%IOQPid = ioq:fetch_pid_for(DbName, Ctx),
+    %%ok = ioq:set_pid_for(DbName, IOQPid),
     case lists:keyfind(io_priority, 1, Options) of
     {io_priority, Pri} ->
         erlang:put(io_priority, Pri);