You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/03/14 19:26:28 UTC

[33/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Fix job migration during membership change

Specifically jobs which are in error or crashing state now migrate properly.

Previously jobs migrated only when they checkpointed. Since jobs which are
crashing or trying to fetch their filter code do not checkpoint, they never
migrated and thus could result in duplicate jobs. (New jobs would start on
new node, but jobs would not stop on the old node).

This also simplifies code a bit - removed `no_owner` return value from job
ownership test. Since we do the  check in doc processor all jobs should
have a proper db and doc.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/99a2b908
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/99a2b908
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/99a2b908

Branch: refs/heads/63012-scheduler
Commit: 99a2b90834cc0ee27c763285c6bced51a4b37cfc
Parents: bc2f053
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Wed Dec 7 11:41:56 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Wed Dec 7 11:41:56 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator_clustering.erl    | 23 ++++++++++-----
 src/couch_replicator_db_changes.erl    | 13 +--------
 src/couch_replicator_doc_processor.erl | 43 ++++++++++++++++++++++++++++-
 src/couch_replicator_scheduler_job.erl | 22 +++++----------
 4 files changed, 66 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/99a2b908/src/couch_replicator_clustering.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_clustering.erl b/src/couch_replicator_clustering.erl
index 07b103e..4a6abd7 100644
--- a/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator_clustering.erl
@@ -30,6 +30,7 @@
 
 % public API
 -export([start_link/0, owner/2, is_stable/0]).
+-export([link_cluster_event_listener/1]).
 
 % gen_server callbacks
 -export([init/1, handle_call/3, handle_info/2, handle_cast/2,
@@ -60,14 +61,10 @@ start_link() ->
 
 
 % owner/2 function computes ownership for a {DbName, DocId} tuple
-% Returns `no_owner` in case no DocId is null, `unstable` if cluster
-% is considered to be unstable i.e. it has changed recently, or returns
-% node() which is considered to be the owner.
+% `unstable` if cluster is considered to be unstable i.e. it has changed
+% recently, or returns node() which of the owner.
 %
--spec owner(Dbname :: binary(), DocId :: binary() | null) ->
-    node() | no_owner | unstable.
-owner(_DbName, null) ->
-    no_owner;
+-spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable.
 owner(<<"shards/", _/binary>> = DbName, DocId) ->
     case is_stable() of
         false ->
@@ -84,6 +81,18 @@ is_stable() ->
     gen_server:call(?MODULE, is_stable).
 
 
+% Convenience function for gen_servers to subscribe to {cluster, stable} and
+% {cluster, unstable} events from couch_replicator clustering module.
+-spec link_cluster_event_listener(pid()) -> pid().
+link_cluster_event_listener(GenServer) when is_pid(GenServer) ->
+    CallbackFun =
+        fun(Event = {cluster, _}) -> gen_server:cast(GenServer, Event);
+           (_) -> ok
+        end,
+    {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
+    Pid.
+
+
 % gen_server callbacks
 
 init([]) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/99a2b908/src/couch_replicator_db_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_db_changes.erl b/src/couch_replicator_db_changes.erl
index 78ec069..924c24f 100644
--- a/src/couch_replicator_db_changes.erl
+++ b/src/couch_replicator_db_changes.erl
@@ -32,7 +32,7 @@ start_link() ->
 
 
 init([]) ->
-    EvtPid = start_link_cluster_event_listener(),
+    EvtPid = couch_replicator_clustering:link_cluster_event_listener(self()),
     State = #state{event_listener = EvtPid, mdb_changes = nil},
     case couch_replicator_clustering:is_stable() of
         true ->
@@ -88,14 +88,3 @@ stop_mdb_changes(#state{mdb_changes = Pid} = State) ->
     unlink(Pid),
     exit(Pid, kill),
     State#state{mdb_changes = nil}.
-
-
--spec start_link_cluster_event_listener() -> pid().
-start_link_cluster_event_listener() ->
-    Server = self(),
-    CallbackFun =
-        fun(Event = {cluster, _}) -> gen_server:cast(Server, Event);
-           (_) -> ok
-        end,
-    {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
-    Pid.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/99a2b908/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 6851542..035a7ec 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -166,6 +166,7 @@ start_link() ->
 
 init([]) ->
     ?MODULE = ets:new(?MODULE, [ordered_set, named_table, {keypos, #rdoc.id}]),
+    couch_replicator_clustering:link_cluster_event_listener(self()),
     {ok, nil}.
 
 
@@ -189,6 +190,14 @@ handle_call({clean_up_replications, DbName}, _From, State) ->
     ok = removed_db(DbName),
     {reply, ok, State}.
 
+handle_cast({cluster, unstable}, State) ->
+    % Ignoring unstable state transition
+    {noreply, State};
+
+handle_cast({cluster, stable}, State) ->
+    % Membership changed recheck all the replication document ownership
+    nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE),
+    {noreply, State};
 
 handle_cast(Msg, State) ->
     {stop, {error, unexpected_message, Msg}, State}.
@@ -530,6 +539,21 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
     lists:member(State, States).
 
 
+-spec cluster_membership_foldl(#rdoc{}, nil) -> nil.
+cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
+    case couch_replicator_clustering:owner(DbName, DocId) of
+        unstable ->
+            nil;
+        ThisNode when ThisNode =:= node() ->
+            nil;
+        OtherNode ->
+            Msg = "Replication doc ~p:~p with id ~p usurped by node ~p",
+            couch_log:notice(Msg, [DbName, DocId, RepId, OtherNode]),
+            removed_doc(Id),
+            nil
+    end.
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
@@ -558,7 +582,8 @@ doc_processor_test_() ->
             t_failed_change(),
             t_change_for_different_node(),
             t_change_when_cluster_unstable(),
-            t_ejson_docs()
+            t_ejson_docs(),
+            t_cluster_membership_foldl()
         ]
     }.
 
@@ -707,6 +732,21 @@ t_ejson_docs() ->
     end).
 
 
+% Check that when cluster membership changes records from doc processor and job
+% scheduler get removed
+t_cluster_membership_foldl() ->
+   ?_test(begin
+        mock_existing_jobs_lookup([test_rep(?R1)]),
+        ?assertEqual(ok, process_change(?DB, change())),
+        meck:expect(couch_replicator_clustering, owner, 2, different_node),
+        ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
+        gen_server:cast(?MODULE, {cluster, stable}),
+        timer:sleep(100),
+        ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
+        ?assert(removed_job(?R1))
+   end).
+
+
 normalize_rep_test_() ->
     {
         setup,
@@ -745,6 +785,7 @@ setup() ->
     meck:expect(config, get, fun(_, _, Default) -> Default end),
     meck:expect(config, listen_for_changes, 2, ok),
     meck:expect(couch_replicator_clustering, owner, 2, node()),
+    meck:expect(couch_replicator_clustering, link_cluster_event_listener, 1, ok),
     meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 3, wref),
     meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
     meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/99a2b908/src/couch_replicator_scheduler_job.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler_job.erl b/src/couch_replicator_scheduler_job.erl
index 4dcecb4..1c9faaf 100644
--- a/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator_scheduler_job.erl
@@ -349,21 +349,13 @@ handle_cast({db_compacted, DbName},
     {noreply, State#rep_state{target = NewTarget}};
 
 handle_cast(checkpoint, State) ->
-    #rep_state{rep_details = #rep{db_name = DbName, doc_id = DocId} = Rep} = State,
-    case couch_replicator_clustering:owner(DbName, DocId) of
-    Owner when Owner =:= node(); Owner =:= no_owner; Owner =:= unstable ->
-        case do_checkpoint(State) of
-        {ok, NewState} ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, success]),
-            {noreply, NewState#rep_state{timer = start_timer(State)}};
-        Error ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
-            {stop, Error, State}
-        end;
-    Other when Other =/= node() ->
-        couch_log:notice("Replication `~s` usurped by ~s (triggered by `~s`)",
-            [pp_rep_id(Rep#rep.id), Other, DocId]),
-        {stop, shutdown, State}
+    case do_checkpoint(State) of
+    {ok, NewState} ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+        {noreply, NewState#rep_state{timer = start_timer(State)}};
+    Error ->
+        couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+        {stop, Error, State}
     end;
 
 handle_cast({report_seq, Seq},