You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/22 21:17:11 UTC
[couchdb] 02/02: WIP - read repair
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 7a51482229d4ec5d609422d827c33c199510734b
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Mar 22 16:01:58 2018 -0500
WIP - read repair
---
src/fabric/src/fabric_doc_open.erl | 18 +++++++++++-------
src/fabric/src/fabric_doc_open_revs.erl | 31 ++++++++++++++++---------------
src/fabric/src/fabric_rpc.erl | 3 ++-
3 files changed, 29 insertions(+), 23 deletions(-)
diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl
index b974880..445cfbb 100644
--- a/src/fabric/src/fabric_doc_open.erl
+++ b/src/fabric/src/fabric_doc_open.erl
@@ -25,8 +25,8 @@
r,
state,
replies,
- q_reply,
- replies_by_node=[] %[{Node, Reply}] used for checking if a doc is purged
+ node_id_revs = [],
+ q_reply
}).
@@ -84,8 +84,13 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
end;
handle_message(Reply, Worker, Acc) ->
NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
- NewNReplies = [{Worker#shard.node, Reply}|Acc#acc.replies_by_node],
- NewAcc = Acc#acc{replies = NewReplies, replies_by_node = NewNReplies},
+ NewNodeIdRevs = case Reply of
+ {ok #doc{id = Id, revs = {Pos, [Rev | _]}} ->
+ [{Worker#shard.node {Id, [{Pos, Rev}]}} | Acc#acc.node_id_revs];
+ _ ->
+ Acc#acc.node_id_revs
+ end,
+ NewAcc = Acc#acc{replies = NewReplies, node_id_revs = NewNodeIdRevs},
case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
{true, QuorumReply} ->
fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
@@ -124,15 +129,14 @@ is_r_met(Workers, Replies, R) ->
no_more_workers
end.
-read_repair(#acc{dbname=DbName, replies=Replies, replies_by_node=NReplies0}) ->
+read_repair(#acc{dbname=DbName, replies=Replies, node_id_revs=NodeIdRevs}) ->
Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
- NReplies = [{Node, Doc} || {Node, {ok, #doc{}=Doc}} <- NReplies0],
case Docs of
% omit local docs from read repair
[#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
choose_reply(Docs);
[#doc{id=Id} | _] ->
- Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NReplies}],
+ Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeIdRevs}],
Res = fabric:update_docs(DbName, Docs, Opts),
case Res of
{ok, []} ->
diff --git a/src/fabric/src/fabric_doc_open_revs.erl b/src/fabric/src/fabric_doc_open_revs.erl
index 61e8afd..a020ba6 100644
--- a/src/fabric/src/fabric_doc_open_revs.erl
+++ b/src/fabric/src/fabric_doc_open_revs.erl
@@ -29,8 +29,8 @@
revs,
latest,
replies = [],
+ node_id_revs = [],
repair = false,
- replies_by_node=[] %[{Node, Reply}] used for read_repair
}).
go(DbName, Id, Revs, Options) ->
@@ -83,7 +83,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
worker_count = WorkerCount,
workers = Workers,
replies = PrevReplies,
- replies_by_node = PrevNReplies,
+ node_id_revs = PrevNodeIdRevs,
r = R,
revs = Revs,
latest = Latest,
@@ -109,9 +109,14 @@ handle_message({ok, RawReplies}, Worker, State) ->
{NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies),
{NewReplies0, MinCount >= R, false}
end,
- NewNReplies = case Worker of
- nil -> PrevNReplies;
- _ -> [{Worker#shard.node, RawReplies}|PrevNReplies]
+ NewNodeIdRevs = if Woker == nil -> PrevNodeIdRevs; true ->
+ IdRevs = lists:foldl(fun
+ ({ok, #doc{id = Id, revs = {Pos, [Rev | _]}}}, Acc) ->
+ [{Id, {Pos, Rev}} | Acc];
+ (_, Acc) ->
+ Acc
+ end, [], RawReplies),
+ [{Worker#worker.shard, IdRevs} | PrevNodeIdRevs]
end,
Complete = (ReplyCount =:= (WorkerCount - 1)),
@@ -123,7 +128,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
DbName,
IsTree,
NewReplies,
- NewNReplies,
+ NewNodeIdRevs,
ReplyCount + 1,
InRepair orelse Repair
),
@@ -131,7 +136,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
false ->
{ok, State#state{
replies = NewReplies,
- replies_by_node = NewNReplies,
+ node_id_revs = NewNodeIdRevs,
reply_count = ReplyCount + 1,
workers = lists:delete(Worker, Workers),
repair = InRepair orelse Repair
@@ -188,7 +193,7 @@ dict_replies(Dict, [Reply | Rest]) ->
dict_replies(NewDict, Rest).
-maybe_read_repair(Db, IsTree, Replies, NodeReplies0, ReplyCount, DoRepair) ->
+maybe_read_repair(Db, IsTree, Replies, NodeIdRevs, ReplyCount, DoRepair) ->
Docs = case IsTree of
true -> tree_repair_docs(Replies, DoRepair);
false -> dict_repair_docs(Replies, ReplyCount)
@@ -197,11 +202,7 @@ maybe_read_repair(Db, IsTree, Replies, NodeReplies0, ReplyCount, DoRepair) ->
[] ->
ok;
_ ->
- NodeReplies = lists:foldl(fun({Node, NReplies}, Acc) ->
- NewAcc = [{Node, Doc} || {ok, Doc} <- NReplies],
- NewAcc ++ Acc
- end, [], NodeReplies0),
- erlang:spawn(fun() -> read_repair(Db, Docs, NodeReplies) end)
+ erlang:spawn(fun() -> read_repair(Db, Docs, NodeIdRevs) end)
end.
@@ -220,8 +221,8 @@ dict_repair_docs(Replies, ReplyCount) ->
end.
-read_repair(Db, Docs, NodeReplies) ->
- Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeReplies}],
+read_repair(Db, Docs, NodeIdRevs) ->
+ Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeIdRevs}],
Res = fabric:update_docs(Db, Docs, Opts),
case Res of
{ok, []} ->
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 91fbb9e..d1d33e9 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -351,7 +351,8 @@ read_repair_filter_docs(DbName, NodeIdRevs, Docs, Options) ->
% NodeIdRevs are the list of {node(), {docid(), [rev()]}}
% tuples passed as the read_repair option to update_docs.
filter_purged_revs(Db, NodeIdRevs, Docs) ->
- Nodes = lists:usort([Node || {Node, _IdRevs} <- NodeIdRevs]),
+ Nodes0 = [Node || {Node, _IdRevs} <- NodeIdRevs, Node /= node()],
+ Nodes = lists:usort(Nodes0),
% Gather the list of {Node, PurgeSeq} pairs for all nodes
% that are present in our read repair group
--
To stop receiving notification emails like this one, please contact
davisp@apache.org.