You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/08/14 15:35:21 UTC

[couchdb] branch master updated: Fix replication rescheduling Running < MaxJobs corner case

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 36fd9da  Fix replication rescheduling Running < MaxJobs corner case
36fd9da is described below

commit 36fd9dab64814475221e72022bd9227cfbd034d2
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Aug 13 16:53:44 2019 -0400

    Fix replication rescheduling Running < MaxJobs corner case
    
    Previously, when total number of replication jobs exceed `MaxJobs`, if some
    jobs crashed, additional jobs didn't start immediately to bring the running
    total up to the `MaxJobs` limit. Then, during rescheduling, the `Running ==
    MaxJobs, Pending > 0` guard would fail and jobs would not rotate. In other
    words, if at least one job crashed, rotation didn't happen.
    
    The fix is to simplify the rotation logic to handle the `Running < MaxJobs`
    case. First, up to `Churn` number of jobs are stopped, then enough jobs are
    started to reach the `MaxJobs` limit.
    
    The rotation logic case handles the `start_pending_jobs/3` case so there is no
    need to call that separately before rotation happens.
---
 .../src/couch_replicator_scheduler.erl             | 82 +++++++++++++---------
 1 file changed, 49 insertions(+), 33 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index e3dbede..7fe417a 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -494,7 +494,10 @@ start_jobs(Count, State) ->
 
 
 -spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer().
-stop_jobs(Count, IsContinuous, State) ->
+stop_jobs(Count, _, _) when is_integer(Count), Count =< 0 ->
+    0;
+
+stop_jobs(Count, IsContinuous, State) when is_integer(Count) ->
     Running0 = running_jobs(),
     ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end,
     Running1 = lists:filter(ContinuousPred, Running0),
@@ -723,35 +726,25 @@ reset_job_process(#job{} = Job) ->
 
 -spec reschedule(#state{}) -> ok.
 reschedule(State) ->
-    Running = running_job_count(),
-    Pending = pending_job_count(),
-    stop_excess_jobs(State, Running),
-    start_pending_jobs(State, Running, Pending),
-    rotate_jobs(State, Running, Pending),
-    update_running_jobs_stats(State#state.stats_pid),
-    ok.
+    StopCount = stop_excess_jobs(State, running_job_count()),
+    rotate_jobs(State, StopCount),
+    update_running_jobs_stats(State#state.stats_pid).
 
 
--spec stop_excess_jobs(#state{}, non_neg_integer()) -> ok.
+-spec stop_excess_jobs(#state{}, non_neg_integer()) -> non_neg_integer().
 stop_excess_jobs(State, Running) ->
     #state{max_jobs=MaxJobs} = State,
-    StopCount = Running - MaxJobs,
-    if StopCount =< 0 -> ok; true ->
-        Stopped = stop_jobs(StopCount, true, State),
-        OneshotLeft = StopCount - Stopped,
-        if OneshotLeft =< 0 -> ok; true ->
-            stop_jobs(OneshotLeft, false, State),
-            ok
-        end
-    end.
+    StopCount = max(0, Running - MaxJobs),
+    Stopped = stop_jobs(StopCount, true, State),
+    OneshotLeft = StopCount - Stopped,
+    stop_jobs(OneshotLeft, false, State),
+    StopCount.
 
 
 start_pending_jobs(State) ->
-    start_pending_jobs(State, running_job_count(), pending_job_count()).
-
-
-start_pending_jobs(State, Running, Pending) ->
     #state{max_jobs=MaxJobs} = State,
+    Running = running_job_count(),
+    Pending = pending_job_count(),
     if Running < MaxJobs, Pending > 0 ->
         start_jobs(MaxJobs - Running, State);
     true ->
@@ -759,13 +752,19 @@ start_pending_jobs(State, Running, Pending) ->
     end.
 
 
--spec rotate_jobs(#state{}, non_neg_integer(), non_neg_integer()) -> ok.
-rotate_jobs(State, Running, Pending) ->
+-spec rotate_jobs(#state{}, non_neg_integer()) -> ok.
+rotate_jobs(State, ChurnSoFar) ->
     #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State,
-    if Running == MaxJobs, Pending > 0 ->
-        RotateCount = lists:min([Pending, Running, MaxChurn]),
-        StopCount = stop_jobs(RotateCount, true, State),
-        start_jobs(StopCount, State);
+    Running = running_job_count(),
+    Pending = pending_job_count(),
+    % Reduce MaxChurn by the number of already stopped jobs in the
+    % current rescheduling cycle.
+    Churn = max(0, MaxChurn - ChurnSoFar),
+    if Running =< MaxJobs ->
+        StopCount = lists:min([Pending, Running, Churn]),
+        stop_jobs(StopCount, true, State),
+        StartCount = max(0, MaxJobs - running_job_count()),
+        start_jobs(StartCount, State);
     true ->
         ok
     end.
@@ -1047,6 +1046,7 @@ scheduler_test_() ->
             t_excess_prefer_continuous_first(),
             t_stop_oldest_first(),
             t_start_oldest_first(),
+            t_jobs_churn_even_if_not_all_max_jobs_are_running(),
             t_dont_stop_if_nothing_pending(),
             t_max_churn_limits_number_of_rotated_jobs(),
             t_existing_jobs(),
@@ -1056,7 +1056,7 @@ scheduler_test_() ->
             t_rotate_continuous_only_if_mixed(),
             t_oneshot_dont_get_starting_priority(),
             t_oneshot_will_hog_the_scheduler(),
-            t_if_excess_is_trimmed_rotation_doesnt_happen(),
+            t_if_excess_is_trimmed_rotation_still_happens(),
             t_if_transient_job_crashes_it_gets_removed(),
             t_if_permanent_job_crashes_it_stays_in_ets(),
             t_job_summary_running(),
@@ -1177,10 +1177,10 @@ t_stop_oldest_first() ->
             continuous_running(5)
         ],
         setup_jobs(Jobs),
-        reschedule(mock_state(2)),
+        reschedule(mock_state(2, 1)),
         ?assertEqual({2, 1}, run_stop_count()),
         ?assertEqual([4], jobs_stopped()),
-        reschedule(mock_state(1)),
+        reschedule(mock_state(1, 1)),
         ?assertEqual([7], jobs_running())
     end).
 
@@ -1192,6 +1192,22 @@ t_start_oldest_first() ->
         ?assertEqual({1, 2}, run_stop_count()),
         ?assertEqual([2], jobs_running()),
         reschedule(mock_state(2)),
+        ?assertEqual({2, 1}, run_stop_count()),
+        % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should
+        % be running.
+        ?assertEqual([2], jobs_stopped())
+    end).
+
+
+t_jobs_churn_even_if_not_all_max_jobs_are_running() ->
+    ?_test(begin
+        setup_jobs([
+            continuous_running(7),
+            continuous(2),
+            continuous(5)
+        ]),
+        reschedule(mock_state(2, 2)),
+        ?assertEqual({2, 1}, run_stop_count()),
         ?assertEqual([7], jobs_stopped())
     end).
 
@@ -1289,7 +1305,7 @@ t_oneshot_will_hog_the_scheduler() ->
     end).
 
 
-t_if_excess_is_trimmed_rotation_doesnt_happen() ->
+t_if_excess_is_trimmed_rotation_still_happens() ->
     ?_test(begin
         Jobs = [
             continuous(1),
@@ -1298,7 +1314,7 @@ t_if_excess_is_trimmed_rotation_doesnt_happen() ->
         ],
         setup_jobs(Jobs),
         reschedule(mock_state(1)),
-        ?assertEqual([3], jobs_running())
+        ?assertEqual([1], jobs_running())
     end).