You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/08/13 21:26:14 UTC

[couchdb] branch fix-replicator-reschedule updated (0eb524a -> 6639b9f)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch fix-replicator-reschedule
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard 0eb524a  Fix replication rescheduling Running < MaxJobs corner case
     new 6639b9f  Fix replication rescheduling Running < MaxJobs corner case

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (0eb524a)
            \
             N -- N -- N   refs/heads/fix-replicator-reschedule (6639b9f)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[couchdb] 01/01: Fix replication rescheduling Running < MaxJobs corner case

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch fix-replicator-reschedule
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6639b9f02b79530cfaf14d7867cacf3971ffaab4
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Aug 13 16:53:44 2019 -0400

    Fix replication rescheduling Running < MaxJobs corner case
    
    Previously, when total number of replication jobs exceed `MaxJobs`, if some
    jobs crashed, additional jobs didn't start immediately to bring the running
    total up to the `MaxJobs` limit. Then, during rescheduling, the `Running ==
    MaxJobs, Pending > 0` guard would fail and jobs would not rotate. In other
    words, if at least one job crashed, rotation didn't happen.
    
    The fix is to simplify the rotation logic to handle the `Running < MaxJobs`
    case. First, up to `Churn` number of jobs are stopped, then enough jobs are
    started to reach the MaxJobs limit. This case also handles the
    `start_pending_jobs/3` logic so there is no need to call that before rotation
    happens.
---
 .../src/couch_replicator_scheduler.erl             | 70 ++++++++++++++--------
 1 file changed, 44 insertions(+), 26 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index e3dbede..e0bb10d 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -723,35 +723,30 @@ reset_job_process(#job{} = Job) ->
 
 -spec reschedule(#state{}) -> ok.
 reschedule(State) ->
-    Running = running_job_count(),
-    Pending = pending_job_count(),
-    stop_excess_jobs(State, Running),
-    start_pending_jobs(State, Running, Pending),
-    rotate_jobs(State, Running, Pending),
-    update_running_jobs_stats(State#state.stats_pid),
-    ok.
+    StopCount = stop_excess_jobs(State, running_job_count()),
+    rotate_jobs(State, StopCount),
+    update_running_jobs_stats(State#state.stats_pid).
 
 
--spec stop_excess_jobs(#state{}, non_neg_integer()) -> ok.
+-spec stop_excess_jobs(#state{}, non_neg_integer()) -> non_neg_integer().
 stop_excess_jobs(State, Running) ->
     #state{max_jobs=MaxJobs} = State,
-    StopCount = Running - MaxJobs,
-    if StopCount =< 0 -> ok; true ->
+    StopCount = max(0, Running - MaxJobs),
+    if StopCount == 0 -> ok; true ->
         Stopped = stop_jobs(StopCount, true, State),
         OneshotLeft = StopCount - Stopped,
         if OneshotLeft =< 0 -> ok; true ->
             stop_jobs(OneshotLeft, false, State),
             ok
         end
-    end.
+    end,
+    StopCount.
 
 
 start_pending_jobs(State) ->
-    start_pending_jobs(State, running_job_count(), pending_job_count()).
-
-
-start_pending_jobs(State, Running, Pending) ->
     #state{max_jobs=MaxJobs} = State,
+    Running = running_job_count(),
+    Pending = pending_job_count(),
     if Running < MaxJobs, Pending > 0 ->
         start_jobs(MaxJobs - Running, State);
     true ->
@@ -759,13 +754,19 @@ start_pending_jobs(State, Running, Pending) ->
     end.
 
 
--spec rotate_jobs(#state{}, non_neg_integer(), non_neg_integer()) -> ok.
-rotate_jobs(State, Running, Pending) ->
+-spec rotate_jobs(#state{}, non_neg_integer()) -> ok.
+rotate_jobs(State, ChurnSoFar) ->
     #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State,
-    if Running == MaxJobs, Pending > 0 ->
-        RotateCount = lists:min([Pending, Running, MaxChurn]),
-        StopCount = stop_jobs(RotateCount, true, State),
-        start_jobs(StopCount, State);
+    Running = running_job_count(),
+    Pending = pending_job_count(),
+    % Reduce MaxChurn by the number of already stopped jobs in the
+    % current rescheduling cycle.
+    Churn = max(0, MaxChurn - ChurnSoFar),
+    if Running =< MaxJobs, Pending > 0 ->
+        StopCount = lists:min([Pending, Running, Churn]),
+        stop_jobs(StopCount, true, State),
+        StartCount = max(0, MaxJobs - running_job_count()),
+        start_jobs(StartCount, State);
     true ->
         ok
     end.
@@ -1047,6 +1048,7 @@ scheduler_test_() ->
             t_excess_prefer_continuous_first(),
             t_stop_oldest_first(),
             t_start_oldest_first(),
+            t_jobs_churn_even_if_not_all_max_jobs_are_running(),
             t_dont_stop_if_nothing_pending(),
             t_max_churn_limits_number_of_rotated_jobs(),
             t_existing_jobs(),
@@ -1056,7 +1058,7 @@ scheduler_test_() ->
             t_rotate_continuous_only_if_mixed(),
             t_oneshot_dont_get_starting_priority(),
             t_oneshot_will_hog_the_scheduler(),
-            t_if_excess_is_trimmed_rotation_doesnt_happen(),
+            t_if_excess_is_trimmed_rotation_still_happens(),
             t_if_transient_job_crashes_it_gets_removed(),
             t_if_permanent_job_crashes_it_stays_in_ets(),
             t_job_summary_running(),
@@ -1177,10 +1179,10 @@ t_stop_oldest_first() ->
             continuous_running(5)
         ],
         setup_jobs(Jobs),
-        reschedule(mock_state(2)),
+        reschedule(mock_state(2, 1)),
         ?assertEqual({2, 1}, run_stop_count()),
         ?assertEqual([4], jobs_stopped()),
-        reschedule(mock_state(1)),
+        reschedule(mock_state(1, 1)),
         ?assertEqual([7], jobs_running())
     end).
 
@@ -1192,6 +1194,22 @@ t_start_oldest_first() ->
         ?assertEqual({1, 2}, run_stop_count()),
         ?assertEqual([2], jobs_running()),
         reschedule(mock_state(2)),
+        ?assertEqual({2, 1}, run_stop_count()),
+        % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should
+        % be running.
+        ?assertEqual([2], jobs_stopped())
+    end).
+
+
+t_jobs_churn_even_if_not_all_max_jobs_are_running() ->
+    ?_test(begin
+        setup_jobs([
+            continuous_running(7),
+            continuous(2),
+            continuous(5)
+        ]),
+        reschedule(mock_state(2, 2)),
+        ?assertEqual({2, 1}, run_stop_count()),
         ?assertEqual([7], jobs_stopped())
     end).
 
@@ -1289,7 +1307,7 @@ t_oneshot_will_hog_the_scheduler() ->
     end).
 
 
-t_if_excess_is_trimmed_rotation_doesnt_happen() ->
+t_if_excess_is_trimmed_rotation_still_happens() ->
     ?_test(begin
         Jobs = [
             continuous(1),
@@ -1298,7 +1316,7 @@ t_if_excess_is_trimmed_rotation_doesnt_happen() ->
         ],
         setup_jobs(Jobs),
         reschedule(mock_state(1)),
-        ?assertEqual([3], jobs_running())
+        ?assertEqual([1], jobs_running())
     end).