You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/22 21:17:11 UTC

[couchdb] 02/02: WIP - read repair

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 7a51482229d4ec5d609422d827c33c199510734b
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Mar 22 16:01:58 2018 -0500

    WIP - read repair
---
 src/fabric/src/fabric_doc_open.erl      | 18 +++++++++++-------
 src/fabric/src/fabric_doc_open_revs.erl | 31 ++++++++++++++++---------------
 src/fabric/src/fabric_rpc.erl           |  3 ++-
 3 files changed, 29 insertions(+), 23 deletions(-)

diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl
index b974880..445cfbb 100644
--- a/src/fabric/src/fabric_doc_open.erl
+++ b/src/fabric/src/fabric_doc_open.erl
@@ -25,8 +25,8 @@
     r,
     state,
     replies,
-    q_reply,
-    replies_by_node=[] %[{Node, Reply}] used for checking if a doc is purged
+    node_id_revs = [],
+    q_reply
 }).
 
 
@@ -84,8 +84,13 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
     end;
 handle_message(Reply, Worker, Acc) ->
     NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
-    NewNReplies = [{Worker#shard.node, Reply}|Acc#acc.replies_by_node],
-    NewAcc = Acc#acc{replies = NewReplies, replies_by_node = NewNReplies},
+    NewNodeIdRevs = case Reply of
+        {ok #doc{id = Id, revs = {Pos, [Rev | _]}} ->
+            [{Worker#shard.node {Id, [{Pos, Rev}]}} | Acc#acc.node_id_revs];
+        _ ->
+            Acc#acc.node_id_revs
+    end,
+    NewAcc = Acc#acc{replies = NewReplies, node_id_revs = NewNodeIdRevs},
     case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
     {true, QuorumReply} ->
         fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
@@ -124,15 +129,14 @@ is_r_met(Workers, Replies, R) ->
         no_more_workers
     end.
 
-read_repair(#acc{dbname=DbName, replies=Replies, replies_by_node=NReplies0}) ->
+read_repair(#acc{dbname=DbName, replies=Replies, node_id_revs=NodeIdRevs}) ->
     Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
-    NReplies = [{Node, Doc} || {Node, {ok, #doc{}=Doc}} <- NReplies0],
     case Docs of
     % omit local docs from read repair
     [#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
         choose_reply(Docs);
     [#doc{id=Id} | _] ->
-        Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NReplies}],
+        Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeIdRevs}],
         Res = fabric:update_docs(DbName, Docs, Opts),
         case Res of
             {ok, []} ->
diff --git a/src/fabric/src/fabric_doc_open_revs.erl b/src/fabric/src/fabric_doc_open_revs.erl
index 61e8afd..a020ba6 100644
--- a/src/fabric/src/fabric_doc_open_revs.erl
+++ b/src/fabric/src/fabric_doc_open_revs.erl
@@ -29,8 +29,8 @@
     revs,
     latest,
     replies = [],
+    node_id_revs = [],
     repair = false,
-    replies_by_node=[] %[{Node, Reply}] used for read_repair
 }).
 
 go(DbName, Id, Revs, Options) ->
@@ -83,7 +83,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
         worker_count = WorkerCount,
         workers = Workers,
         replies = PrevReplies,
-        replies_by_node = PrevNReplies,
+        node_id_revs = PrevNodeIdRevs,
         r = R,
         revs = Revs,
         latest = Latest,
@@ -109,9 +109,14 @@ handle_message({ok, RawReplies}, Worker, State) ->
             {NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies),
             {NewReplies0, MinCount >= R, false}
     end,
-    NewNReplies = case Worker of
-        nil -> PrevNReplies;
-        _ -> [{Worker#shard.node, RawReplies}|PrevNReplies]
+    NewNodeIdRevs = if Woker == nil -> PrevNodeIdRevs; true ->
+        IdRevs = lists:foldl(fun
+            ({ok, #doc{id = Id, revs = {Pos, [Rev | _]}}}, Acc) ->
+                [{Id, {Pos, Rev}} | Acc];
+            (_, Acc) ->
+                Acc
+        end, [], RawReplies),
+        [{Worker#worker.shard, IdRevs} | PrevNodeIdRevs]
     end,
 
     Complete = (ReplyCount =:= (WorkerCount - 1)),
@@ -123,7 +128,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
                     DbName,
                     IsTree,
                     NewReplies,
-                    NewNReplies,
+                    NewNodeIdRevs,
                     ReplyCount + 1,
                     InRepair orelse Repair
                 ),
@@ -131,7 +136,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
         false ->
             {ok, State#state{
                 replies = NewReplies,
-                replies_by_node = NewNReplies,
+                node_id_revs = NewNodeIdRevs,
                 reply_count = ReplyCount + 1,
                 workers = lists:delete(Worker, Workers),
                 repair = InRepair orelse Repair
@@ -188,7 +193,7 @@ dict_replies(Dict, [Reply | Rest]) ->
     dict_replies(NewDict, Rest).
 
 
-maybe_read_repair(Db, IsTree, Replies, NodeReplies0, ReplyCount, DoRepair) ->
+maybe_read_repair(Db, IsTree, Replies, NodeIdRevs, ReplyCount, DoRepair) ->
     Docs = case IsTree of
         true -> tree_repair_docs(Replies, DoRepair);
         false -> dict_repair_docs(Replies, ReplyCount)
@@ -197,11 +202,7 @@ maybe_read_repair(Db, IsTree, Replies, NodeReplies0, ReplyCount, DoRepair) ->
         [] ->
             ok;
         _ ->
-            NodeReplies = lists:foldl(fun({Node, NReplies}, Acc) ->
-                NewAcc = [{Node, Doc} || {ok, Doc} <- NReplies],
-                NewAcc ++ Acc
-            end, [], NodeReplies0),
-            erlang:spawn(fun() -> read_repair(Db, Docs, NodeReplies) end)
+            erlang:spawn(fun() -> read_repair(Db, Docs, NodeIdRevs) end)
     end.
 
 
@@ -220,8 +221,8 @@ dict_repair_docs(Replies, ReplyCount) ->
     end.
 
 
-read_repair(Db, Docs, NodeReplies) ->
-    Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeReplies}],
+read_repair(Db, Docs, NodeIdRevs) ->
+    Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeIdRevs}],
     Res = fabric:update_docs(Db, Docs, Opts),
     case Res of
         {ok, []} ->
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 91fbb9e..d1d33e9 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -351,7 +351,8 @@ read_repair_filter_docs(DbName, NodeIdRevs, Docs, Options) ->
 % NodeIdRevs are the list of {node(), {docid(), [rev()]}}
 % tuples passed as the read_repair option to update_docs.
 filter_purged_revs(Db, NodeIdRevs, Docs) ->
-    Nodes = lists:usort([Node || {Node, _IdRevs} <- NodeIdRevs]),
+    Nodes0 = [Node || {Node, _IdRevs} <- NodeIdRevs, Node /= node()],
+    Nodes = lists:usort(Nodes0),
 
     % Gather the list of {Node, PurgeSeq} pairs for all nodes
     % that are present in our read repair group

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.