You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ja...@apache.org on 2017/10/08 16:12:58 UTC

[couchdb] 07/10: remove reliance on couch_replicator_clustering, handle cluster state internally

This is an automated email from the ASF dual-hosted git repository.

jan pushed a commit to branch 749-fix-couch_peruser-app-structure
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 0400ca548e5ef6eea9380fbeafe095a70bdb0433
Author: Jan Lehnardt <ja...@apache.org>
AuthorDate: Sun Oct 8 11:14:28 2017 +0200

    remove reliance on couch_replicator_clustering, handle cluster state internally
---
 src/couch_peruser/src/couch_peruser.erl | 56 ++++++++++++++++++++++-----------
 1 file changed, 37 insertions(+), 19 deletions(-)

diff --git a/src/couch_peruser/src/couch_peruser.erl b/src/couch_peruser/src/couch_peruser.erl
index a31ff60..791431c 100644
--- a/src/couch_peruser/src/couch_peruser.erl
+++ b/src/couch_peruser/src/couch_peruser.erl
@@ -33,7 +33,8 @@
 ]).
 
 -record(state, {parent, db_name, delete_dbs, changes_pid, changes_ref}).
--record(clusterState, {parent,
+-record(clusterState, {
+    parent,
     db_name,
     delete_dbs,
     states,
@@ -48,10 +49,10 @@
 
 
 start_link() ->
-    gen_server:start_link(?MODULE, [], []).
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 init() ->
-    couch_log:debug("peruser: starting on node ~p", [node()]),
+    couch_log:debug("peruser: starting on node ~p in pid ~p", [node(), self()]),
     case config:get_boolean("couch_peruser", "enable", false) of
     false ->
         couch_log:debug("peruser: disabled on node ~p", [node()]),
@@ -107,6 +108,8 @@ start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterSta
     end.
 
 init_changes_handler(#state{db_name=DbName} = State) ->
+    % leave for debugging
+    % couch_log:debug("peruser: init_changes_handler() on DbName ~p", [DbName]),
     try
         {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX, sys_db]),
         FunAcc = {fun ?MODULE:changes_handler/3, State},
@@ -120,6 +123,9 @@ init_changes_handler(#state{db_name=DbName} = State) ->
 
 
 changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{db_name=DbName}) ->
+    % leave for debugging
+    % couch_log:debug("peruser: changes_handler() on DbName/Doc ~p/~p", [DbName, Doc]),
+
     case couch_util:get_value(<<"id">>, Doc) of
     <<"org.couchdb.user:",User/binary>>=DocId ->
         case should_handle_doc(DbName, DocId) of
@@ -149,22 +155,28 @@ changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{db_name=DbName
 changes_handler(_Event, _ResType, State) ->
     State.
 
-should_handle_doc(DbName, DocId) ->
-  case couch_replicator_clustering:owner(DbName, DocId) of
-      unstable ->
-          % todo: when we do proper resume[1], we can return false here
-          % and rely on a module restart when the cluster is stable again
-          % in the meantime, we risk conflicts when the cluster gets unstable
-          % and users are being created.
-          % [1] https://github.com/apache/couchdb/issues/872
-          true;
-      ThisNode when ThisNode =:= node() ->
-          couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]),
-          % do the deed
-          true;
-      _OtherNode ->
-          couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]),
-          false
+should_handle_doc(ShardName, DocId) ->
+    should_handle_doc_int(ShardName, DocId, is_stable()).
+
+should_handle_doc_int(ShardName, DocId, false) ->
+    % when the cluster is unstable, we have already stopped all Listeners
+    % the next stable event will restart all listeners and pick up this
+    % doc change
+    couch_log:debug("peruser: skipping, cluster unstable ~s/~s", [ShardName, DocId]),
+    false;
+should_handle_doc_int(ShardName, DocId, true) ->
+    DbName = mem3:dbname(ShardName),
+    Live = [erlang:node() | erlang:nodes()],
+    Shards = mem3:shards(DbName, DocId),
+    Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)],
+    case mem3:owner(DbName, DocId, Nodes) of
+        ThisNode when ThisNode =:= node() ->
+            couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]),
+            % do the deed
+            true;
+        _OtherNode ->
+            couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]),
+            false
   end.
 
 
@@ -258,6 +270,10 @@ exit_changes(ClusterState) ->
         exit(State#state.changes_pid, kill)
     end, ClusterState#clusterState.states).
 
+-spec is_stable() -> true | false.
+is_stable() ->
+    gen_server:call(?MODULE, is_stable).
+
 % Mem3 cluster callbacks
 
 cluster_unstable(Server) ->
@@ -274,6 +290,8 @@ init([]) ->
     ok = subscribe_for_changes(),
     {ok, init()}.
 
+handle_call(is_stable, _From, #clusterState{cluster_stable = IsStable} = State) ->
+    {reply, IsStable, State};
 handle_call(_Msg, _From, State) ->
     {reply, error, State}.
 

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.