You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/03/14 19:26:32 UTC

[37/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Make sure doc processor workers do not re-add deleted replication jobs

Previously, especially in case of filtered replications, doc processor workers
could inadvertently re-add a replication job after it was deleted.

Workers after finishing fetching filter code and computing the replication
id, would try to add the replication job to the scheduler. They did that without
checking if replication document was already deleted, or another worker
was spawned.

The fix is to create a unique worker reference, pass it to the
worker, then worker confirms they are still current and document was not
deleted before adding the job, otherwise they exit with an `ignore` result.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/700a9295
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/700a9295
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/700a9295

Branch: refs/heads/63012-scheduler
Commit: 700a9295590d789b33c34999c75a4410e22eb43d
Parents: f2b3ac5
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Dec 7 20:51:45 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Jan 11 15:00:23 2017 -0500

----------------------------------------------------------------------
 src/couch_replicator.hrl                      |  2 +
 src/couch_replicator_doc_processor.erl        | 48 +++++++++--
 src/couch_replicator_doc_processor_worker.erl | 94 +++++++++++++++-------
 3 files changed, 107 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/700a9295/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl
index 339e162..b8669e8 100644
--- a/src/couch_replicator.hrl
+++ b/src/couch_replicator.hrl
@@ -30,11 +30,13 @@
 -type seconds() :: non_neg_integer().
 -type rep_start_result() ::
     {ok, rep_id()} |
+    ignore |
     {temporary_error, binary()} |
     {permanent_failure, binary()}.
 
 
 -record(doc_worker_result, {
     id :: db_doc_id(),
+    wref :: reference(),
     result :: rep_start_result()
 }).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/700a9295/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 402a72f..9c2e2b3 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -16,6 +16,7 @@
 -export([start_link/0]).
 -export([docs/1, doc/2]).
 -export([update_docs/0]).
+-export([get_worker_ref/1]).
 
 % multidb changes callback
 -export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
@@ -88,6 +89,18 @@ db_change(DbName, {ChangeProps} = Change, Server) ->
     Server.
 
 
+-spec get_worker_ref(db_doc_id()) -> reference() | nil.
+get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
+    case ets:lookup(?MODULE, {DbName, DocId}) of
+        [#rdoc{worker = WRef}] when is_reference(WRef) ->
+            WRef;
+        [#rdoc{worker = nil}] ->
+            nil;
+        [] ->
+            nil
+    end.
+
+
 % Private helpers for multidb changes API, these updates into the doc
 % processor gen_server
 
@@ -203,8 +216,8 @@ handle_cast(Msg, State) ->
     {stop, {error, unexpected_message, Msg}, State}.
 
 
-handle_info({'DOWN', Ref, _, _, #doc_worker_result{id = Id, result = Res}},
-        State) ->
+handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
+        result = Res}}, State) ->
     ok = worker_returned(Ref, Id, Res),
     {noreply, State};
 
@@ -324,6 +337,9 @@ worker_returned(Ref, Id, {ok, RepId}) ->
     end,
     ok;
 
+worker_returned(_Ref, _Id, ignore) ->
+    ok;
+
 worker_returned(Ref, Id, {temporary_error, Reason}) ->
     case ets:lookup(?MODULE, Id) of
     [#rdoc{worker = Ref, errcnt = ErrCnt} = Row] ->
@@ -413,8 +429,9 @@ maybe_start_worker(Id) ->
         ok;
     [#rdoc{rep = Rep} = Doc] ->
         Wait = get_worker_wait(Doc),
-        WRef = couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait),
-        true = ets:insert(?MODULE, Doc#rdoc{worker = WRef}),
+        Ref = make_ref(),
+        true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
+        couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
         ok
     end.
 
@@ -773,6 +790,22 @@ normalize_rep_test_() ->
     }.
 
 
+get_worker_ref_test_() ->
+    {
+        setup,
+        fun() -> ets:new(?MODULE, [named_table, public, {keypos, #rdoc.id}]) end,
+        fun(_) -> ets:delete(?MODULE) end,
+        ?_test(begin
+            Id = {<<"db">>, <<"doc">>},
+            ?assertEqual(nil, get_worker_ref(Id)),
+            ets:insert(?MODULE, #rdoc{id = Id, worker = nil}),
+            ?assertEqual(nil, get_worker_ref(Id)),
+            Ref = make_ref(),
+            ets:insert(?MODULE, #rdoc{id = Id, worker = Ref}),
+            ?assertEqual(Ref, get_worker_ref(Id))
+        end)
+    }.
+
 
 % Test helper functions
 
@@ -786,7 +819,7 @@ setup() ->
     meck:expect(config, listen_for_changes, 2, ok),
     meck:expect(couch_replicator_clustering, owner, 2, node()),
     meck:expect(couch_replicator_clustering, link_cluster_event_listener, 1, ok),
-    meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 3, wref),
+    meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 4, pid),
     meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
     meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),
     meck:expect(couch_replicator_docs, update_failed, 4, ok),
@@ -804,9 +837,8 @@ removed_state_fields() ->
     meck:called(couch_replicator_docs, remove_state_fields, [?DB, ?DOC1]).
 
 
-started_worker(Id) ->
-    meck:called(couch_replicator_doc_processor_worker, spawn_worker,
-        [Id, '_', '_']).
+started_worker(_Id) ->
+    1 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker, 4).
 
 
 removed_job(Id) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/700a9295/src/couch_replicator_doc_processor_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator_doc_processor_worker.erl
index a6bdeef..30a6988 100644
--- a/src/couch_replicator_doc_processor_worker.erl
+++ b/src/couch_replicator_doc_processor_worker.erl
@@ -12,7 +12,7 @@
 
 -module(couch_replicator_doc_processor_worker).
 
--export([spawn_worker/3]).
+-export([spawn_worker/4]).
 
 -include("couch_replicator.hrl").
 
@@ -27,19 +27,19 @@
 % a worker will then exit with the #doc_worker_result{} record within
 % ?WORKER_TIMEOUT_MSEC timeout period.A timeout is considered a `temporary_error`.
 % Result will be sent as the `Reason` in the {'DOWN',...} message.
--spec spawn_worker(db_doc_id(), #rep{}, seconds()) -> reference().
-spawn_worker(Id, Rep, WaitSec) ->
-    {_Pid, WRef} = spawn_monitor(fun() -> worker_fun(Id, Rep, WaitSec) end),
-    WRef.
+-spec spawn_worker(db_doc_id(), #rep{}, seconds(), reference()) -> pid().
+spawn_worker(Id, Rep, WaitSec, WRef) ->
+    {Pid, _Ref} = spawn_monitor(fun() -> worker_fun(Id, Rep, WaitSec, WRef) end),
+    Pid.
 
 
 % Private functions
 
--spec worker_fun(db_doc_id(), #rep{}, seconds()) -> no_return().
-worker_fun(Id, Rep, WaitSec) ->
+-spec worker_fun(db_doc_id(), #rep{}, seconds(), reference()) -> no_return().
+worker_fun(Id, Rep, WaitSec, WRef) ->
     timer:sleep(WaitSec * 1000),
     Fun = fun() ->
-        try maybe_start_replication(Id, Rep) of
+        try maybe_start_replication(Id, Rep, WRef) of
             Res ->
                 exit(Res)
         catch
@@ -52,7 +52,7 @@ worker_fun(Id, Rep, WaitSec) ->
     {Pid, Ref} = spawn_monitor(Fun),
     receive
         {'DOWN', Ref, _, Pid, Result} ->
-            exit(#doc_worker_result{id = Id, result = Result})
+            exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
     after ?WORKER_TIMEOUT_MSEC ->
         erlang:demonitor(Ref, [flush]),
         exit(Pid, kill),
@@ -61,7 +61,7 @@ worker_fun(Id, Rep, WaitSec) ->
         Msg = io_lib:format("Replication for db ~p doc ~p failed to start due "
             "to timeout after ~B seconds", [DbName, DocId, TimeoutSec]),
         Result = {temporary_error, couch_util:to_binary(Msg)},
-        exit(#doc_worker_result{id = Id, result = Result})
+        exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
     end.
 
 
@@ -69,9 +69,11 @@ worker_fun(Id, Rep, WaitSec) ->
 % rep_start_result(), also throws {filter_fetch_error, Reason} if cannot fetch filter.
 % It can also block for an indeterminate amount of time while fetching the
 % filter.
-maybe_start_replication(Id, RepWithoutId) ->
+maybe_start_replication(Id, RepWithoutId, WRef) ->
     Rep = couch_replicator_docs:update_rep_id(RepWithoutId),
-    case maybe_add_job_to_scheduler(Id, Rep) of
+    case maybe_add_job_to_scheduler(Id, Rep, WRef) of
+    ignore ->
+        ignore;
     {ok, RepId} ->
         {ok, RepId};
     {temporary_error, Reason} ->
@@ -84,18 +86,23 @@ maybe_start_replication(Id, RepWithoutId) ->
     end.
 
 
--spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}) -> rep_start_result().
-maybe_add_job_to_scheduler({_DbName, DocId}, Rep) ->
+-spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}, reference()) ->
+   rep_start_result().
+maybe_add_job_to_scheduler({DbName, DocId}, Rep, WRef) ->
     RepId = Rep#rep.id,
     case couch_replicator_scheduler:rep_state(RepId) of
     nil ->
-        case couch_replicator_scheduler:add_job(Rep) of
-        ok ->
-           ok;
-        {error, already_added} ->
-            couch_log:warning("replicator scheduler: ~p was already added", [Rep])
-        end,
-        {ok, RepId};
+        % Before adding a job check that this worker is still the current
+        % worker. This is to handle a race condition where a worker which was
+        % sleeping and then checking a replication filter may inadvertently re-add
+        % a replication which was already deleted.
+        case couch_replicator_doc_processor:get_worker_ref({DbName, DocId}) of
+        WRef ->
+            ok = couch_replicator_scheduler:add_job(Rep),
+            {ok, RepId};
+        _NilOrOtherWRef ->
+            ignore
+        end;
     #rep{doc_id = DocId} ->
         {ok, RepId};
     #rep{doc_id = null} ->
@@ -130,7 +137,9 @@ doc_processor_worker_test_() ->
             t_already_running_same_docid(),
             t_already_running_transient(),
             t_already_running_other_db_other_doc(),
-            t_spawn_worker()
+            t_spawn_worker(),
+            t_ignore_if_doc_deleted(),
+            t_ignore_if_worker_ref_does_not_match()
         ]
     }.
 
@@ -140,7 +149,7 @@ t_should_add_job() ->
    ?_test(begin
        Id = {?DB, ?DOC1},
        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep)),
+       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
        ?assert(added_job())
    end).
 
@@ -151,7 +160,7 @@ t_already_running_same_docid() ->
        Id = {?DB, ?DOC1},
        mock_already_running(?DB, ?DOC1),
        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep)),
+       ?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
        ?assert(did_not_add_job())
    end).
 
@@ -162,7 +171,7 @@ t_already_running_transient() ->
        Id = {?DB, ?DOC1},
        mock_already_running(null, null),
        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep)),
+       ?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep, nil)),
        ?assert(did_not_add_job())
    end).
 
@@ -174,7 +183,7 @@ t_already_running_other_db_other_doc() ->
        Id = {?DB, ?DOC1},
        mock_already_running(<<"otherdb">>, <<"otherdoc">>),
        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       ?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep)),
+       ?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep, nil)),
        ?assert(did_not_add_job()),
        1 == meck:num_calls(couch_replicator_docs, update_failed, '_')
    end).
@@ -185,15 +194,41 @@ t_spawn_worker() ->
    ?_test(begin
        Id = {?DB, ?DOC1},
        Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
-       Ref = spawn_worker(Id, Rep, 0),
-       Res = receive  {'DOWN', Ref, _, _, Reason} -> Reason
+       WRef = make_ref(),
+       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
+       Pid = spawn_worker(Id, Rep, 0, WRef),
+       Res = receive  {'DOWN', _Ref, process, Pid, Reason} -> Reason
            after 1000 -> timeout end,
-       Expect = #doc_worker_result{id = Id, result = {ok, ?R1}},
+       Expect = #doc_worker_result{id = Id, wref = WRef, result = {ok, ?R1}},
        ?assertEqual(Expect, Res),
        ?assert(added_job())
    end).
 
 
+% Should not add job if by the time worker got to fetching the filter
+% and getting a replication id, replication doc was deleted
+t_ignore_if_doc_deleted() ->
+   ?_test(begin
+       Id = {?DB, ?DOC1},
+       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
+       ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
+       ?assertNot(added_job())
+   end).
+
+
+% Should not add job if by the time worker got to fetchign the filter
+% and building a replication id, another worker was spawned.
+t_ignore_if_worker_ref_does_not_match() ->
+    ?_test(begin
+       Id = {?DB, ?DOC1},
+       Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+       meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, make_ref()),
+       ?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
+       ?assertNot(added_job())
+   end).
+
+
 % Test helper functions
 
 setup() ->
@@ -202,6 +237,7 @@ setup() ->
     meck:expect(couch_server, get_uuid, 0, this_is_snek),
     meck:expect(couch_replicator_docs, update_failed, 4, ok),
     meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
+    meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
     ok.