You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/06/08 17:14:40 UTC

[couchdb] branch prototype/fdb-layer updated: Split couch_views acceptors and workers

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/prototype/fdb-layer by this push:
     new 3536ad8  Split couch_views acceptors and workers
3536ad8 is described below

commit 3536ad87088ad0901cf28213f006ddf7c52b85b5
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Thu Jun 4 17:57:41 2020 -0400

    Split couch_views acceptors and workers
    
    Optimize couch_views by using a separate set of acceptors and workers.
    Previously, all `max_workers` where spawned on startup, and were to
    waiting to accept jobs in parallel. In a setup with a large number of
    pods, and 100 workers per pod, that could lead to a lot of conflicts
    being generated when all those workers race to accept the same job at
    the same time.
    
    The improvement is to spawn only a limited number of acceptors (5, by
    default), then, spawn more after some of them become workers. Also,
    when some workers finish or die with an error, check if more acceptors
    could be spawned.
    
    As an example, here is what might happen with `max_acceptors = 5` and
    `max_workers = 100` (`A` and `W` are the current counts of acceptors
    and workers, respectively):
    
    1. Starting out:
      `A = 5, W = 0`
    
    2. After 2 acceptors start running:
      `A = 3, W = 2`
    Then immediately 2 more acceptors are spawned:
      `A = 5, W = 2`
    
    3. After 95 workers are started:
      `A = 5, W = 95`
    
    4. Now if 3 acceptors accept, it would look like:
      `A = 2, W = 98`
    But no more acceptors would be started.
    
    5. If the last 2 acceptors also accept jobs: `A = 0, W = 100` At this
      point no more indexing jobs can be accepted and started until at
      least one of the workers finish and exit.
    
    6. If 1 worker exits:
      `A = 0, W = 99`
    An acceptor will be immediately spawned
      `A = 1, W = 99`
    
    7. If all 99 workers exit, it will go back to:
     `A = 5, W = 0`
---
 rel/overlay/etc/default.ini                        |  12 +-
 src/couch_views/src/couch_views_indexer.erl        |   3 +
 src/couch_views/src/couch_views_server.erl         |  96 +++++++--
 src/couch_views/test/couch_views_server_test.erl   | 218 +++++++++++++++++++++
 .../test/couch_views_trace_index_test.erl          |   2 +
 5 files changed, 309 insertions(+), 22 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 43e1c0b..40a3b31 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -294,11 +294,17 @@ iterations = 10 ; iterations for password hashing
 
 ; Settings for view indexing
 [couch_views]
-; max_workers = 100
+; Maximum acceptors waiting to accept view indexing jobs
+;max_acceptors = 5
+;
+; Maximum number of view indexing workers
+;max_workers = 100
+;
 ; The maximum allowed key size emitted from a view for a document (in bytes)
-; key_size_limit = 8000
+;key_size_limit = 8000
+;
 ; The maximum allowed value size emitted from a view for a document (in bytes)
-; value_size_limit = 64000
+;value_size_limit = 64000
 
 ; CSP (Content Security Policy) Support for _utils
 [csp]
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 4d09fdb..31868d9 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -46,6 +46,9 @@ spawn_link() ->
 init() ->
     Opts = #{no_schedule => true},
     {ok, Job, Data0} = couch_jobs:accept(?INDEX_JOB_TYPE, Opts),
+
+    couch_views_server:accepted(self()),
+
     Data = upgrade_data(Data0),
     #{
         <<"db_name">> := DbName,
diff --git a/src/couch_views/src/couch_views_server.erl b/src/couch_views/src/couch_views_server.erl
index d14216e..e45a9f3 100644
--- a/src/couch_views/src/couch_views_server.erl
+++ b/src/couch_views/src/couch_views_server.erl
@@ -20,6 +20,9 @@
     start_link/0
 ]).
 
+-export([
+    accepted/1
+]).
 
 -export([
     init/1,
@@ -30,7 +33,7 @@
     code_change/3
 ]).
 
-
+-define(MAX_ACCEPTORS, 5).
 -define(MAX_WORKERS, 100).
 
 
@@ -38,20 +41,44 @@ start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 
+accepted(Worker) when is_pid(Worker) ->
+    gen_server:call(?MODULE, {accepted, Worker}, infinity).
+
+
 init(_) ->
     process_flag(trap_exit, true),
     couch_views_jobs:set_timeout(),
     St = #{
+        acceptors => #{},
         workers => #{},
+        max_acceptors => max_acceptors(),
         max_workers => max_workers()
     },
-    {ok, spawn_workers(St)}.
+    {ok, spawn_acceptors(St)}.
 
 
 terminate(_, _St) ->
     ok.
 
 
+handle_call({accepted, Pid}, _From, St) ->
+    #{
+        acceptors := Acceptors,
+        workers := Workers
+    } = St,
+    case maps:is_key(Pid, Acceptors) of
+        true ->
+            St1 = St#{
+                acceptors := maps:remove(Pid, Acceptors),
+                workers := Workers#{Pid => true}
+            },
+            {reply, ok, spawn_acceptors(St1)};
+        false ->
+            LogMsg = "~p : unknown acceptor processs ~p",
+            couch_log:error(LogMsg, [?MODULE, Pid]),
+            {stop, {unknown_acceptor_pid, Pid}, St}
+    end;
+
 handle_call(Msg, _From, St) ->
     {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
 
@@ -61,19 +88,16 @@ handle_cast(Msg, St) ->
 
 
 handle_info({'EXIT', Pid, Reason}, St) ->
-    #{workers := Workers} = St,
-    case maps:is_key(Pid, Workers) of
-        true ->
-            if Reason == normal -> ok; true ->
-                LogMsg = "~p : indexer process ~p exited with ~p",
-                couch_log:error(LogMsg, [?MODULE, Pid, Reason])
-            end,
-            NewWorkers = maps:remove(Pid, Workers),
-            {noreply, spawn_workers(St#{workers := NewWorkers})};
-        false ->
-            LogMsg = "~p : unknown process ~p exited with ~p",
-            couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
-            {stop, {unknown_pid_exit, Pid}, St}
+    #{
+        acceptors := Acceptors,
+        workers := Workers
+    } = St,
+
+    % In Erlang 21+ could check map keys directly in the function head
+    case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of
+        {true, false} -> handle_acceptor_exit(St, Pid, Reason);
+        {false, true} -> handle_worker_exit(St, Pid, Reason);
+        {false, false} -> handle_unknown_exit(St, Pid, Reason)
     end;
 
 handle_info(Msg, St) ->
@@ -84,20 +108,54 @@ code_change(_OldVsn, St, _Extra) ->
     {ok, St}.
 
 
-spawn_workers(St) ->
+% Worker process exit handlers
+
+handle_acceptor_exit(#{acceptors := Acceptors} = St, Pid, Reason) ->
+    St1 = St#{acceptors := maps:remove(Pid, Acceptors)},
+    LogMsg = "~p : acceptor process ~p exited with ~p",
+    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+    {noreply, spawn_acceptors(St1)}.
+
+
+handle_worker_exit(#{workers := Workers} = St, Pid, normal) ->
+    St1 = St#{workers := maps:remove(Pid, Workers)},
+    {noreply, spawn_acceptors(St1)};
+
+handle_worker_exit(#{workers := Workers} = St, Pid, Reason) ->
+    St1 = St#{workers := maps:remove(Pid, Workers)},
+    LogMsg = "~p : indexer process ~p exited with ~p",
+    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+    {noreply, spawn_acceptors(St1)}.
+
+
+handle_unknown_exit(St, Pid, Reason) ->
+    LogMsg = "~p : unknown process ~p exited with ~p",
+    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+    {stop, {unknown_pid_exit, Pid}, St}.
+
+
+spawn_acceptors(St) ->
     #{
         workers := Workers,
+        acceptors := Acceptors,
+        max_acceptors := MaxAcceptors,
         max_workers := MaxWorkers
     } = St,
-    case maps:size(Workers) < MaxWorkers of
+    ACnt = maps:size(Acceptors),
+    WCnt = maps:size(Workers),
+    case ACnt < MaxAcceptors andalso (ACnt + WCnt) < MaxWorkers of
         true ->
             Pid = couch_views_indexer:spawn_link(),
-            NewSt = St#{workers := Workers#{Pid => true}},
-            spawn_workers(NewSt);
+            NewSt = St#{acceptors := Acceptors#{Pid => true}},
+            spawn_acceptors(NewSt);
         false ->
             St
     end.
 
 
+max_acceptors() ->
+    config:get_integer("couch_views", "max_acceptors", ?MAX_ACCEPTORS).
+
+
 max_workers() ->
     config:get_integer("couch_views", "max_workers", ?MAX_WORKERS).
diff --git a/src/couch_views/test/couch_views_server_test.erl b/src/couch_views/test/couch_views_server_test.erl
new file mode 100644
index 0000000..23c807c
--- /dev/null
+++ b/src/couch_views/test/couch_views_server_test.erl
@@ -0,0 +1,218 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_server_test).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+couch_views_server_test_() ->
+    {
+        "Test couch_views_server",
+        {
+            setup,
+            fun setup/0,
+            fun cleanup/1,
+            {
+                foreach,
+                fun foreach_setup/0,
+                fun foreach_teardown/1,
+                [
+                    ?TDEF_FE(max_acceptors_started),
+                    ?TDEF_FE(acceptors_become_workers),
+                    ?TDEF_FE(handle_worker_death),
+                    ?TDEF_FE(handle_acceptor_death),
+                    ?TDEF_FE(handle_unknown_process_death),
+                    ?TDEF_FE(max_workers_limit_works),
+                    ?TDEF_FE(max_acceptors_greater_than_max_workers)
+                ]
+            }
+        }
+    }.
+
+
+setup() ->
+    Ctx = test_util:start_couch([
+            fabric,
+            couch_jobs,
+            couch_rate,
+            couch_js,
+            couch_eval
+        ]),
+    Ctx.
+
+
+cleanup(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+
+foreach_setup() ->
+    config:set("couch_views", "max_acceptors", "2", false),
+    config:set("couch_views", "max_workers", "4", false),
+    meck:new(couch_views_server, [passthrough]),
+    meck:new(couch_views_indexer, [passthrough]),
+    meck:expect(couch_views_indexer, init, fun() ->
+        receive pls_accept -> ok end,
+        couch_views_server:accepted(self()),
+        receive pls_die -> ok end
+    end),
+    ok = application:start(couch_views).
+
+
+foreach_teardown(_) ->
+    ok = application:stop(couch_views),
+    meck:unload(),
+    config:delete("couch_views", "max_acceptors", false),
+    config:delete("couch_views", "max_workers", false),
+    ok.
+
+
+max_acceptors_started(_) ->
+    #{max_acceptors := MaxAcceptors, max_workers := MaxWorkers} = get_state(),
+    ?assertEqual(2, MaxAcceptors),
+    ?assertEqual(4, MaxWorkers),
+
+    ?assertEqual(0, maps:size(workers())),
+
+    [Pid1, Pid2] = maps:keys(acceptors()),
+    ?assert(is_pid(Pid1)),
+    ?assert(is_pid(Pid2)),
+    ?assert(is_process_alive(Pid1)),
+    ?assert(is_process_alive(Pid2)).
+
+
+acceptors_become_workers(_) ->
+    ?assertEqual(0, maps:size(workers())),
+
+    InitAcceptors = acceptors(),
+    accept_all(),
+
+    ?assertEqual(2, maps:size(acceptors())),
+    ?assertEqual(2, maps:size(workers())),
+
+    ?assertEqual(InitAcceptors, workers()).
+
+
+handle_worker_death(_) ->
+    [Pid1, Pid2] = maps:keys(acceptors()),
+    accept_all(),
+
+    % One worker exits normal
+    finish_normal([Pid1]),
+    ?assertEqual(2, maps:size(acceptors())),
+    ?assertEqual(1, maps:size(workers())),
+
+    % The other blows up with an error
+    finish_error([Pid2]),
+    ?assertEqual(2, maps:size(acceptors())),
+    ?assertEqual(0, maps:size(workers())).
+
+
+handle_acceptor_death(_) ->
+    [Pid1, Pid2] = maps:keys(acceptors()),
+    finish_error([Pid1]),
+
+    NewAcceptors = acceptors(),
+    ?assertEqual(2, maps:size(NewAcceptors)),
+    ?assert(lists:member(Pid2, maps:keys(NewAcceptors))),
+    ?assert(not lists:member(Pid1, maps:keys(NewAcceptors))).
+
+
+handle_unknown_process_death(_) ->
+    meck:reset(couch_views_server),
+    Pid = self(),
+    whereis(couch_views_server) ! {'EXIT', Pid, blah},
+    meck:wait(1, couch_views_server, terminate,
+        [{unknown_pid_exit, Pid}, '_'], 5000).
+
+
+max_workers_limit_works(_) ->
+    % Accept 2 jobs -> 2 workers
+    accept_all(),
+    ?assertEqual(2, maps:size(workers())),
+
+    % Accept 2 more jobs -> 4 workers
+    accept_all(),
+    ?assertEqual(0, maps:size(acceptors())),
+    ?assertEqual(4, maps:size(workers())),
+
+    % Kill 1 worker -> 1 acceptor and 3 workers
+    [Worker1 | _] = maps:keys(workers()),
+    finish_normal([Worker1]),
+    ?assertEqual(1, maps:size(acceptors())),
+    ?assertEqual(3, maps:size(workers())),
+
+    % Kill 2 more workers -> 2 acceptors and 1 worker
+    [Worker2, Worker3 | _] = maps:keys(workers()),
+    finish_normal([Worker2, Worker3]),
+    ?assertEqual(2, maps:size(acceptors())),
+    ?assertEqual(1, maps:size(workers())),
+
+    % Kill 1 last worker -> 2 acceptors and 0 workers
+    [Worker4] = maps:keys(workers()),
+    finish_normal([Worker4]),
+    ?assertEqual(2, maps:size(acceptors())),
+    ?assertEqual(0, maps:size(workers())).
+
+max_acceptors_greater_than_max_workers(_) ->
+    [Pid1, Pid2] = maps:keys(acceptors()),
+
+    sys:replace_state(couch_views_server, fun(#{} = St) ->
+        St#{max_workers := 1}
+    end),
+
+    accept_all(),
+
+    finish_normal([Pid1]),
+    finish_normal([Pid2]),
+
+    % Only 1 acceptor should start as it is effectively limited by max_workers
+    ?assertEqual(1, maps:size(acceptors())),
+    ?assertEqual(0, maps:size(workers())).
+
+
+% Utility functions
+
+accept_all() ->
+    Acceptors = acceptors(),
+    meck:reset(couch_views_server),
+    [Pid ! pls_accept || Pid <- maps:keys(Acceptors)],
+    meck:wait(maps:size(Acceptors), couch_views_server, handle_call, 3, 5000).
+
+
+acceptors() ->
+    #{acceptors := Acceptors} = get_state(),
+    Acceptors.
+
+
+workers() ->
+    #{workers := Workers} = get_state(),
+    Workers.
+
+
+get_state() ->
+    sys:get_state(couch_views_server, infinity).
+
+
+finish_normal(Workers) when is_list(Workers) ->
+    meck:reset(couch_views_server),
+    [Pid ! pls_die || Pid <- Workers],
+    meck:wait(length(Workers), couch_views_server, handle_info,
+        [{'_', '_', normal}, '_'], 5000).
+
+
+finish_error(Workers) when is_list(Workers) ->
+    meck:reset(couch_views_server),
+    [exit(Pid, badness) || Pid <- Workers],
+    meck:wait(length(Workers), couch_views_server, handle_info,
+        [{'_', '_', badness}, '_'], 5000).
diff --git a/src/couch_views/test/couch_views_trace_index_test.erl b/src/couch_views/test/couch_views_trace_index_test.erl
index 5b15a4c..f5ea379 100644
--- a/src/couch_views/test/couch_views_trace_index_test.erl
+++ b/src/couch_views/test/couch_views_trace_index_test.erl
@@ -88,6 +88,8 @@ trace_single_doc(Db) ->
     meck:expect(couch_jobs, accept, 2, {ok, job, JobData}),
     meck:expect(couch_jobs, update, 3, {ok, job}),
     meck:expect(couch_jobs, finish, 3, ok),
+    meck:expect(couch_views_server, accepted, 1, ok),
+
     put(erlfdb_trace, <<"views_write_one_doc">>),
     couch_views_indexer:init(),