You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2019/02/28 18:50:43 UTC

[couchdb] branch ioq-per-shard-or-user created (now 1644b1d)

This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a change to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at 1644b1d  WIP: IOQ2 per shard/user

This branch includes the following new commits:

     new 1644b1d  WIP: IOQ2 per shard/user

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: WIP: IOQ2 per shard/user

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chewbranca pushed a commit to branch ioq-per-shard-or-user
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 1644b1dfc93106792631a7688ed5bd413dddd03b
Author: Russell Branca <ch...@apache.org>
AuthorDate: Thu Feb 28 18:45:15 2019 +0000

    WIP: IOQ2 per shard/user
---
 Makefile                                           |  2 +-
 src/couch/src/couch_bt_engine.erl                  |  5 ++
 src/couch/src/couch_bt_engine_compactor.erl        | 10 ++++
 src/couch/src/couch_db.erl                         |  7 ++-
 src/couch/src/couch_db_engine.erl                  |  6 +++
 src/couch/src/couch_db_updater.erl                 | 10 ++--
 src/couch/src/couch_server.erl                     | 59 ++++++++++++++--------
 src/couch/src/couch_server_int.hrl                 |  1 +
 src/couch/test/couch_server_tests.erl              |  2 +-
 src/couch_mrview/src/couch_mrview_updater.erl      | 12 ++++-
 src/couch_pse_tests/src/cpse_test_ref_counting.erl |  9 ++--
 src/fabric/src/fabric_rpc.erl                      |  4 ++
 12 files changed, 94 insertions(+), 33 deletions(-)

diff --git a/Makefile b/Makefile
index 208d169..74b925b 100644
--- a/Makefile
+++ b/Makefile
@@ -165,7 +165,7 @@ eunit: export COUCHDB_QUERY_SERVER_JAVASCRIPT = $(shell pwd)/bin/couchjs $(shell
 eunit: couch
 	@$(REBAR) setup_eunit 2> /dev/null
 	@for dir in $(subdirs); do \
-	  $(REBAR) -r eunit $(EUNIT_OPTS) apps=$$dir; \
+	  $(REBAR) -r eunit $(EUNIT_OPTS) apps=$$dir || exit 1; \
 	done
 
 setup-eunit: export BUILDDIR = $(shell pwd)
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 7b33c42..d18afb9 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -45,6 +45,7 @@
     get_partition_info/2,
     get_update_seq/1,
     get_uuid/1,
+    get_fd_pid/1,
 
     set_revs_limit/2,
     set_purge_infos_limit/2,
@@ -348,6 +349,10 @@ get_uuid(#st{header = Header}) ->
     couch_bt_engine_header:get(Header, uuid).
 
 
+get_fd_pid(#st{fd = Fd}) ->
+    Fd.
+
+
 set_revs_limit(#st{header = Header} = St, RevsLimit) ->
     NewSt = St#st{
         header = couch_bt_engine_header:set(Header, [
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 737f772..ed14348 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -39,6 +39,7 @@
 start(#st{} = St, DbName, Options, Parent) ->
     erlang:put(io_priority, {db_compact, DbName}),
     #st{
+        fd = FdPid,
         filepath = FilePath,
         header = Header
     } = St,
@@ -46,6 +47,15 @@ start(#st{} = St, DbName, Options, Parent) ->
 
     couch_db_engine:trigger_on_compact(DbName),
 
+    IOQPid = case lists:member(sys_db, Options) of
+        false ->
+            Ctx = couch_util:get_value(user_ctx, Options, undefined),
+            ioq:fetch_pid_for(DbName, Ctx, FdPid);
+        true ->
+            undefined
+    end,
+    ok = ioq:set_pid_for(FdPid, IOQPid),
+
     {ok, NewSt, DName, DFd, MFd, Retry} =
             open_compaction_files(Header, FilePath, Options),
     erlang:monitor(process, MFd),
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 74f4a09..56ad6b2 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -45,6 +45,7 @@
     get_filepath/1,
     get_instance_start_time/1,
     get_pid/1,
+    get_fd_pid/1,
     get_revs_limit/1,
     get_security/1,
     get_update_seq/1,
@@ -249,7 +250,8 @@ monitored_by(Db) ->
     case couch_db_engine:monitored_by(Db) of
         Pids when is_list(Pids) ->
             PidTracker = whereis(couch_stats_process_tracker),
-            Pids -- [Db#db.main_pid, PidTracker];
+            IOQOpener = whereis(ioq_opener),
+            Pids -- [Db#db.main_pid, PidTracker, IOQOpener];
         undefined ->
             []
     end.
@@ -556,6 +558,9 @@ get_purge_infos_limit(#db{}=Db) ->
 get_pid(#db{main_pid = Pid}) ->
     Pid.
 
+get_fd_pid(#db{}=Db) ->
+    couch_db_engine:get_fd_pid(Db).
+
 get_del_doc_count(Db) ->
     {ok, couch_db_engine:get_del_doc_count(Db)}.
 
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 91d35b0..dd66b62 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -706,6 +706,7 @@
     get_partition_info/2,
     get_update_seq/1,
     get_uuid/1,
+    get_fd_pid/1,
 
     set_revs_limit/2,
     set_security/2,
@@ -894,6 +895,11 @@ get_uuid(#db{} = Db) ->
     Engine:get_uuid(EngineState).
 
 
+get_fd_pid(#db{} = Db) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:get_fd_pid(EngineState).
+
+
 set_revs_limit(#db{} = Db, RevsLimit) ->
     #db{engine = {Engine, EngineState}} = Db,
     {ok, NewSt} = Engine:set_revs_limit(EngineState, RevsLimit),
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 4227ff0..268abdb 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -42,12 +42,16 @@ init({Engine, DbName, FilePath, Options0}) ->
     try
         {ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options),
         Db = init_db(DbName, FilePath, EngineState, Options),
-        case lists:member(sys_db, Options) of
+        Ctx = couch_util:get_value(user_ctx, Options, undefined),
+        FdPid = couch_db:get_fd_pid(Db),
+        IOQPid = case lists:member(sys_db, Options) of
             false ->
-                couch_stats_process_tracker:track([couchdb, open_databases]);
+                couch_stats_process_tracker:track([couchdb, open_databases]),
+                ioq:fetch_pid_for(DbName, Ctx, FdPid);
             true ->
-                ok
+                undefined
         end,
+        ok = ioq:set_pid_for(FdPid, IOQPid),
         % Don't load validation funs here because the fabric query is
         % liable to race conditions. Instead see
         % couch_db:validate_doc_update, which loads them lazily.
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 3ceab3a..403ab94 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -80,17 +80,21 @@ sup_start_link() ->
 open(DbName, Options0) ->
     Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
     case ets:lookup(couch_dbs, DbName) of
-    [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
+    [#entry{db = Db0, lock = Lock, ioq_pid=IOQPid} = Entry] when Lock =/= locked ->
         update_lru(DbName, Entry#entry.db_options),
         {ok, Db1} = couch_db:incref(Db0),
+        FdPid = couch_db:get_fd_pid(Db1),
+        ok = ioq:set_pid_for(FdPid, IOQPid),
         couch_db:set_user_ctx(Db1, Ctx);
     _ ->
         Options = maybe_add_sys_db_callbacks(DbName, Options0),
         Timeout = couch_util:get_value(timeout, Options, infinity),
         Create = couch_util:get_value(create_if_missing, Options, false),
         case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of
-        {ok, Db0} ->
+        {ok, #entry{db=Db0, ioq_pid=IOQPid}} ->
             {ok, Db1} = couch_db:incref(Db0),
+            FdPid = couch_db:get_fd_pid(Db1),
+            ok = ioq:set_pid_for(FdPid, IOQPid),
             couch_db:set_user_ctx(Db1, Ctx);
         {not_found, no_db_file} when Create ->
             couch_log:warning("creating missing database: ~s", [DbName]),
@@ -118,7 +122,7 @@ create(DbName, Options0) ->
     Options = maybe_add_sys_db_callbacks(DbName, Options0),
     couch_partition:validate_dbname(DbName, Options),
     case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
-    {ok, Db0} ->
+    {ok, #entry{db=Db0}} ->
         Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
         {ok, Db1} = couch_db:incref(Db0),
         couch_db:set_user_ctx(Db1, Ctx);
@@ -364,13 +368,22 @@ open_async(Server, From, DbName, {Module, Filepath}, Options) ->
     T0 = os:timestamp(),
     Opener = spawn_link(fun() ->
         Res = couch_db:start_link(Module, DbName, Filepath, Options),
-        case {Res, lists:member(create, Options)} of
-            {{ok, _Db}, true} ->
-                couch_event:notify(DbName, created);
+        IOQPid = case Res of
+            {ok, Db} ->
+                case lists:member(create, Options) of
+                    true -> couch_event:notify(DbName, created);
+                    false -> ok
+                end,
+                Ctx = couch_util:get_value(user_ctx, Options, undefined),
+                FdPid = couch_db:get_fd_pid(Db),
+                case couch_db:is_system_db(Db) of
+                    true -> undefined; %% use default IOQ pid for system dbs
+                    false -> ioq:fetch_pid_for(DbName, Ctx, FdPid)
+                end;
             _ ->
-                ok
+                undefined
         end,
-        gen_server:call(Parent, {open_result, T0, DbName, Res}, infinity),
+        gen_server:call(Parent, {open_result, T0, DbName, Res, IOQPid}, infinity),
         unlink(Parent)
     end),
     ReqType = case lists:member(create, Options) of
@@ -405,7 +418,7 @@ handle_call(reload_engines, _From, Server) ->
     {reply, ok, Server#server{engines = get_configured_engines()}};
 handle_call(get_server, _From, Server) ->
     {reply, {ok, Server}, Server};
-handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
+handle_call({open_result, T0, DbName, {ok, Db}, IOQPid}, {Opener, _}, Server) ->
     true = ets:delete(couch_dbs_pid_to_name, Opener),
     OpenTime = timer:now_diff(os:timestamp(), T0) / 1000,
     couch_stats:update_histogram([couchdb, db_open_time], OpenTime),
@@ -415,9 +428,18 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
             % db was deleted during async open
             exit(DbPid, kill),
             {reply, ok, Server};
-        [#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry] ->
+        [#entry{pid = Opener, req_type = ReqType, waiters = Waiters} = Entry0] ->
             link(DbPid),
-            [gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters],
+            Entry = #entry{
+                name = DbName,
+                db = Db,
+                pid = DbPid,
+                ioq_pid = IOQPid,
+                lock = unlocked,
+                db_options = Entry0#entry.db_options,
+                start_time = couch_db:get_instance_start_time(Db)
+            },
+            [gen_server:reply(Waiter, {ok, Entry}) || Waiter <- Waiters],
             % Cancel the creation request if it exists.
             case ReqType of
                 {create, DbName, _Engine, _Options, CrFrom} ->
@@ -425,14 +447,7 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
                 _ ->
                     ok
             end,
-            true = ets:insert(couch_dbs, #entry{
-                name = DbName,
-                db = Db,
-                pid = DbPid,
-                lock = unlocked,
-                db_options = Entry#entry.db_options,
-                start_time = couch_db:get_instance_start_time(Db)
-            }),
+            true = ets:insert(couch_dbs, Entry),
             true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}),
             Lru = case couch_db:is_system_db(Db) of
                 false ->
@@ -448,9 +463,9 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {Opener, _}, Server) ->
             exit(couch_db:get_pid(Db), kill),
             {reply, ok, Server}
     end;
-handle_call({open_result, T0, DbName, {error, eexist}}, From, Server) ->
-    handle_call({open_result, T0, DbName, file_exists}, From, Server);
-handle_call({open_result, _T0, DbName, Error}, {Opener, _}, Server) ->
+handle_call({open_result, T0, DbName, {error, eexist}, IOQPid}, From, Server) ->
+    handle_call({open_result, T0, DbName, file_exists, IOQPid}, From, Server);
+handle_call({open_result, _T0, DbName, Error, _}, {Opener, _}, Server) ->
     case ets:lookup(couch_dbs, DbName) of
         [] ->
             % db was deleted during async open
diff --git a/src/couch/src/couch_server_int.hrl b/src/couch/src/couch_server_int.hrl
index 537a6ab..86f93d3 100644
--- a/src/couch/src/couch_server_int.hrl
+++ b/src/couch/src/couch_server_int.hrl
@@ -15,6 +15,7 @@
     name,
     db,
     pid,
+    ioq_pid,
     lock,
     waiters,
     req_type,
diff --git a/src/couch/test/couch_server_tests.erl b/src/couch/test/couch_server_tests.erl
index fdbc175..d46b8e1 100644
--- a/src/couch/test/couch_server_tests.erl
+++ b/src/couch/test/couch_server_tests.erl
@@ -251,7 +251,7 @@ wait_for_open_async_result(CouchServer, Opener) ->
         {_, Messages} = erlang:process_info(CouchServer, messages),
         Found = lists:foldl(fun(Msg, Acc) ->
             case Msg of
-                {'$gen_call', {Opener, _}, {open_result, _, _, {ok, _}}} ->
+                {'$gen_call', {Opener, _}, {open_result, _, _, {ok, _}, _}} ->
                     true;
                 _ ->
                     Acc
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 9740e6a..dcc70e4 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -36,7 +36,14 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
 
     Self = self(),
 
+    #mrst{
+        id_btree=#btree{fd=Fd},
+        db_name=DbName
+    } = InitState,
+    IOQPid = ioq:fetch_pid_for(DbName, undefined, Fd),
+
     MapFun = fun() ->
+        ok = ioq:set_pid_for(Fd, IOQPid),
         Progress = case NumChanges of
             0 -> 0;
             _ -> (NumChangesDone * 100) div NumChanges
@@ -53,7 +60,10 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
         couch_task_status:set_update_frequency(500),
         map_docs(Self, InitState)
     end,
-    WriteFun = fun() -> write_results(Self, InitState) end,
+    WriteFun = fun() ->
+        ok = ioq:set_pid_for(Fd, IOQPid),
+        write_results(Self, InitState)
+    end,
 
     spawn_link(MapFun),
     spawn_link(WriteFun),
diff --git a/src/couch_pse_tests/src/cpse_test_ref_counting.erl b/src/couch_pse_tests/src/cpse_test_ref_counting.erl
index cb115a7..7b5981f 100644
--- a/src/couch_pse_tests/src/cpse_test_ref_counting.erl
+++ b/src/couch_pse_tests/src/cpse_test_ref_counting.erl
@@ -37,7 +37,8 @@ cpse_empty_monitors({Db, Pid}) ->
     Expected = [
         Pid,
         couch_db:get_pid(Db),
-        whereis(couch_stats_process_tracker)
+        whereis(couch_stats_process_tracker),
+        whereis(ioq_opener)
     ],
     ?assertEqual([], Pids -- Expected).
 
@@ -63,13 +64,13 @@ cpse_incref_decref_many({Db, _}) ->
     lists:foreach(fun(C) -> wait_client(C) end, Clients),
 
     Pids1 = couch_db_engine:monitored_by(Db),
-    % +3 for self, db pid, and process tracker
-    ?assertEqual(?NUM_CLIENTS + 3, length(Pids1)),
+    % +4 for self, db pid, process tracker, and ioq opener
+    ?assertEqual(?NUM_CLIENTS + 4, length(Pids1)),
 
     lists:foreach(fun(C) -> close_client(C) end, Clients),
 
     Pids2 = couch_db_engine:monitored_by(Db),
-    ?assertEqual(3, length(Pids2)).
+    ?assertEqual(4, length(Pids2)).
 
 
 start_client(Db0) ->
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 97374be..addc722 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -601,6 +601,10 @@ clean_stack() ->
         erlang:get_stacktrace()).
 
 set_io_priority(DbName, Options) ->
+    %% HACK: experiment with spawning IOQ2 pids prior to couch_file open
+    %%Ctx = couch_util:get_value(user_ctx, Options, undefined),
+    %%IOQPid = ioq:fetch_pid_for(DbName, Ctx),
+    %%ok = ioq:set_pid_for(DbName, IOQPid),
     case lists:keyfind(io_priority, 1, Options) of
     {io_priority, Pri} ->
         erlang:put(io_priority, Pri);