You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/06/04 22:16:57 UTC

[couchdb] branch optimize-couch-views-acceptors created (now b4cfd7a)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch optimize-couch-views-acceptors
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at b4cfd7a  Optimize couch_view job accepts: split into separate acceptors and workers

This branch includes the following new commits:

     new b4cfd7a  Optimize couch_view job accepts: split into separate acceptors and workers

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: Optimize couch_view job accepts: split into separate acceptors and workers

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch optimize-couch-views-acceptors
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b4cfd7a78928a0a2f7c35818d6f4d428f1ff83ab
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Thu Jun 4 17:57:41 2020 -0400

    Optimize couch_view job accepts: split into separate acceptors and workers
    
    Instead of max workers all waiting on accept, and racing to grab the same job,
    generating conflicts and retries in the process, switch to having A acceptors
    and W workers as separate configurable items, such that A < W.
    
    A smaller number of acceptors (5 by default) will be spawned and will wait to
    accept jobs. As soon any one accepts a job, and start executing, they will
    notify the parent server via a gen_server call. The main couch_views_server,
    when notified, will mark the acceptor as a worker as long as A + W <
    max_workers.
    
    When any worker exists, the main server may spawn more acceptors if number of
    current cceptors A < max_acceptor _and_ A + W < max_workers.
    
    The main idea behind A + W < max_workers is that we consider acceptors as
    potential workers, they could accept a job at any time and immediately start
    executing it, so we wouldn't want to start another acceptor.
    
    As an example here is what might happen with say max_acceptors = 5 and
    max_workers = 100:
    
    1. Starting out:
      A = 5, W = 0
    
    2. After 2 acceptors get jobs and start running them:
      A = 3, W = 2
    Then immediately spawn another 2 acceptors:
      A = 5, W = 2
    
    3. After 95 workers are started it might look like:
      A = 5, W = 95
    
    4. Now if 3 acceptors accept a job, it would look like:
      A = 2, W = 98
    But no more acceptors would be started.
    
    5. If the last 2 acceptors also accept jobs it might look like:
     A = 0, W = 100
    This is when it reaches full utilization and doesn't accept any more jobs.
    
    6. Then if 1 worker exits:
     A = 0, W = 99
    And 1 acceptor will be spawed
     A = 1, W = 99
    
    If all 99 workers exit, it will go back to:
     A = 5, W = 0
---
 src/couch_views/src/couch_views_indexer.erl |  3 +
 src/couch_views/src/couch_views_server.erl  | 97 +++++++++++++++++++++++------
 2 files changed, 80 insertions(+), 20 deletions(-)

diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 4d09fdb..6708be8 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -46,6 +46,9 @@ spawn_link() ->
 init() ->
     Opts = #{no_schedule => true},
     {ok, Job, Data0} = couch_jobs:accept(?INDEX_JOB_TYPE, Opts),
+
+    couch_views_server:accepted_job(self()),
+
     Data = upgrade_data(Data0),
     #{
         <<"db_name">> := DbName,
diff --git a/src/couch_views/src/couch_views_server.erl b/src/couch_views/src/couch_views_server.erl
index d14216e..6d4c8d6 100644
--- a/src/couch_views/src/couch_views_server.erl
+++ b/src/couch_views/src/couch_views_server.erl
@@ -20,6 +20,9 @@
     start_link/0
 ]).
 
+-export([
+    accepted/1
+]).
 
 -export([
     init/1,
@@ -30,7 +33,7 @@
     code_change/3
 ]).
 
-
+-define(MAX_ACCEPTORS, 5).
 -define(MAX_WORKERS, 100).
 
 
@@ -38,20 +41,44 @@ start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 
+accepted(Worker) when is_pid(Worker) ->
+    gen_server:call(?MODULE, {accepted, Worker}, infinity).
+
+
 init(_) ->
     process_flag(trap_exit, true),
     couch_views_jobs:set_timeout(),
     St = #{
+        acceptors => #{},
         workers => #{},
+        max_acceptors => max_acceptors(),
         max_workers => max_workers()
     },
-    {ok, spawn_workers(St)}.
+    {ok, spawn_acceptors(St)}.
 
 
 terminate(_, _St) ->
     ok.
 
 
+handle_call({accepted, Pid}, _From, St) ->
+    #{
+        acceptors := Acceptors,
+        workers := Workers
+    } = St,
+    case maps:is_key(Pid, Acceptors) of
+        true ->
+            St1 = St#{
+                acceptors := maps:remove(Pid, Acceptors),
+                workers := Workers#{Pid => true}
+            },
+            {reply, ok, spawn_acceptors(St1)};
+        false ->
+            LogMsg = "~p : unknown acceptor processs ~p",
+            couch_log:error(LogMsg, [?MODULE, Pid]),
+            {stop, {unknown_acceptor_pid, Pid}, St}
+    end;
+
 handle_call(Msg, _From, St) ->
     {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
 
@@ -61,20 +88,16 @@ handle_cast(Msg, St) ->
 
 
 handle_info({'EXIT', Pid, Reason}, St) ->
-    #{workers := Workers} = St,
-    case maps:is_key(Pid, Workers) of
-        true ->
-            if Reason == normal -> ok; true ->
-                LogMsg = "~p : indexer process ~p exited with ~p",
-                couch_log:error(LogMsg, [?MODULE, Pid, Reason])
-            end,
-            NewWorkers = maps:remove(Pid, Workers),
-            {noreply, spawn_workers(St#{workers := NewWorkers})};
-        false ->
-            LogMsg = "~p : unknown process ~p exited with ~p",
-            couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
-            {stop, {unknown_pid_exit, Pid}, St}
-    end;
+    #{
+        acceptors := Acceptors,
+        workers := Workers
+    } = St,
+    % In Erlang 21+ could check map keys directly in the function head
+    case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of
+        {true, false} -> handle_acceptor_exit(St, Pid, Reason);
+        {false, true} -> handle_worker_exit(St, Pid, Reason);
+        {false, false} -> handle_unknown_exit(St, Pid, Reason)
+   end;
 
 handle_info(Msg, St) ->
     {stop, {bad_info, Msg}, St}.
@@ -84,20 +107,54 @@ code_change(_OldVsn, St, _Extra) ->
     {ok, St}.
 
 
-spawn_workers(St) ->
+% Worker process exit handlers
+
+handle_acceptor_exit(#{accptors := Acceptors} = St, Pid, Reason) ->
+    St1 = St#{acceptors := maps:remove(Pid, Acceptors)},
+    LogMsg = "~p : acceptor process ~p exited with ~p",
+    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+    {noreply, spawn_acceptors(St1)}.
+
+
+handle_worker_exit(#{workers := Workers} = St, Pid, normal) ->
+    St1 = St#{workers := maps:remove(Pid, Workers)},
+    {noreply, spawn_acceptors(St1)};
+
+handle_worker_exit(#{workers := Workers} = St, Pid, Reason) ->
+    St1 = St#{workers := maps:remove(Pid, Workers)},
+    LogMsg = "~p : indexer process ~p exited with ~p",
+    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+    {noreply, spawn_acceptors(St1)}.
+
+
+handle_unknown_exit(St, Pid, Reason) ->
+    LogMsg = "~p : unknown process ~p exited with ~p",
+    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+    {stop, {unknown_pid_exit, Pid}, St}.
+
+
+spawn_acceptors(St) ->
     #{
         workers := Workers,
+        acceptors := Acceptors,
+        max_acceptors := MaxAcceptors,
         max_workers := MaxWorkers
     } = St,
-    case maps:size(Workers) < MaxWorkers of
+    ACnt = maps:size(Acceptors),
+    WCnt = maps:size(Workers),
+    case ACnt < MaxAcceptors andalso (ACnt + WCnt) < MaxWorkers of
         true ->
             Pid = couch_views_indexer:spawn_link(),
-            NewSt = St#{workers := Workers#{Pid => true}},
-            spawn_workers(NewSt);
+            NewSt = St#{acceptors := Acceptors#{Pid => true}},
+            spawn_acceptors(NewSt);
         false ->
             St
     end.
 
 
+max_acceptors() ->
+    config:get_integer("couch_views", "max_acceptors", ?MAX_ACCEPTORS).
+
+
 max_workers() ->
     config:get_integer("couch_views", "max_workers", ?MAX_WORKERS).