You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/21 17:26:39 UTC

[couchdb] 04/08: Update fabric_doc_open_revs to handle purges

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit ca2867ae9ac648f426ef18466b646cfe8a207882
Author: Mayya Sharipova <ma...@ca.ibm.com>
AuthorDate: Tue Oct 3 15:56:55 2017 -0400

    Update fabric_doc_open_revs to handle purges
    
    We need to account for the possibility that a document is opened while a
    purge is still propogating between shards. This means we need to inform
    the read-repair algorithms that a difference in revisions may be due to
    a purge request in progress. If we don't do this its possible that a
    read-repair may race the purge request and effectively undo the purge.
---
 src/fabric/src/fabric_doc_open_revs.erl | 148 ++++++++++++++++++++++----------
 src/fabric/src/fabric_rpc.erl           |  67 ++++++++-------
 2 files changed, 137 insertions(+), 78 deletions(-)

diff --git a/src/fabric/src/fabric_doc_open_revs.erl b/src/fabric/src/fabric_doc_open_revs.erl
index 096722f..dbe02bf 100644
--- a/src/fabric/src/fabric_doc_open_revs.erl
+++ b/src/fabric/src/fabric_doc_open_revs.erl
@@ -29,7 +29,8 @@
     revs,
     latest,
     replies = [],
-    repair = false
+    repair = false,
+    replies_by_node=[] %[{Node, Reply}] used for read_repair
 }).
 
 go(DbName, Id, Revs, Options) ->
@@ -82,6 +83,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
         worker_count = WorkerCount,
         workers = Workers,
         replies = PrevReplies,
+        replies_by_node = PrevNReplies,
         r = R,
         revs = Revs,
         latest = Latest,
@@ -92,13 +94,14 @@ handle_message({ok, RawReplies}, Worker, State) ->
     IsTree = Revs == all orelse Latest,
 
     % Do not count error replies when checking quorum
-
     RealReplyCount = ReplyCount + 1 - ReplyErrorCount,
     QuorumReplies = RealReplyCount >= R,
     {NewReplies, QuorumMet, Repair} = case IsTree of
         true ->
-            {NewReplies0, AllInternal, Repair0} =
+            {NewReplies0, AllInternal, Repair00} =
                     tree_replies(PrevReplies, tree_sort(RawReplies)),
+            % don't set Repair=true on the first reply
+            Repair0 = (ReplyCount > 0) and Repair00,
             NumLeafs = couch_key_tree:count_leafs(PrevReplies),
             SameNumRevs = length(RawReplies) == NumLeafs,
             QMet = AllInternal andalso SameNumRevs andalso QuorumReplies,
@@ -107,6 +110,10 @@ handle_message({ok, RawReplies}, Worker, State) ->
             {NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies),
             {NewReplies0, MinCount >= R, false}
     end,
+    NewNReplies = case Worker of
+        nil -> PrevNReplies;
+        _ -> [{Worker#shard.node, RawReplies}|PrevNReplies]
+    end,
 
     Complete = (ReplyCount =:= (WorkerCount - 1)),
 
@@ -117,6 +124,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
                     DbName,
                     IsTree,
                     NewReplies,
+                    NewNReplies,
                     ReplyCount + 1,
                     InRepair orelse Repair
                 ),
@@ -124,6 +132,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
         false ->
             {ok, State#state{
                 replies = NewReplies,
+                replies_by_node = NewNReplies,
                 reply_count = ReplyCount + 1,
                 workers = lists:delete(Worker, Workers),
                 repair = InRepair orelse Repair
@@ -180,7 +189,7 @@ dict_replies(Dict, [Reply | Rest]) ->
     dict_replies(NewDict, Rest).
 
 
-maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
+maybe_read_repair(Db, IsTree, Replies, NodeReplies0, ReplyCount, DoRepair) ->
     Docs = case IsTree of
         true -> tree_repair_docs(Replies, DoRepair);
         false -> dict_repair_docs(Replies, ReplyCount)
@@ -189,7 +198,11 @@ maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
         [] ->
             ok;
         _ ->
-            erlang:spawn(fun() -> read_repair(Db, Docs) end)
+            NodeReplies = lists:foldl(fun({Node, NReplies}, Acc) ->
+                NewAcc = [{Node, Doc} || {ok, Doc} <- NReplies],
+                NewAcc ++ Acc
+            end, [], NodeReplies0),
+            erlang:spawn(fun() -> read_repair(Db, Docs, NodeReplies) end)
     end.
 
 
@@ -208,8 +221,9 @@ dict_repair_docs(Replies, ReplyCount) ->
     end.
 
 
-read_repair(Db, Docs) ->
-    Res = fabric:update_docs(Db, Docs, [replicated_changes, ?ADMIN_CTX]),
+read_repair(Db, Docs, NodeReplies) ->
+    Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NodeReplies}],
+    Res = fabric:update_docs(Db, Docs, Opts),
     case Res of
         {ok, []} ->
             couch_stats:increment_counter([fabric, read_repairs, success]);
@@ -268,20 +282,24 @@ filter_reply(Replies) ->
 setup() ->
     config:start_link([]),
     meck:new([fabric, couch_stats, couch_log]),
+    meck:new(fabric_util, [passthrough]),
     meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end),
     meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
-    meck:expect(couch_log, notice, fun(_, _) -> ok end).
+    meck:expect(couch_log, notice, fun(_, _) -> ok end),
+    meck:expect(fabric_util, cleanup, fun(_) -> ok end).
+
 
 
 teardown(_) ->
-    (catch meck:unload([fabric, couch_stats, couch_log])),
+    (catch meck:unload([fabric, couch_stats, couch_log, fabric_util])),
     config:stop().
 
 
 state0(Revs, Latest) ->
     #state{
         worker_count = 3,
-        workers = [w1, w2, w3],
+        workers =
+            [#shard{node='node1'}, #shard{node='node2'}, #shard{node='node3'}],
         r = 2,
         revs = Revs,
         latest = Latest
@@ -334,27 +352,35 @@ open_doc_revs_test_() ->
 check_empty_response_not_quorum() ->
     % Simple smoke test that we don't think we're
     % done with a first empty response
+    W1 = #shard{node='node1'},
+    W2 = #shard{node='node2'},
+    W3 = #shard{node='node3'},
     ?_assertMatch(
-        {ok, #state{workers = [w2, w3]}},
-        handle_message({ok, []}, w1, state0(all, false))
+        {ok, #state{workers = [W2, W3]}},
+        handle_message({ok, []}, W1, state0(all, false))
     ).
 
 
 check_basic_response() ->
     % Check that we've handle a response
+    W1 = #shard{node='node1'},
+    W2 = #shard{node='node2'},
+    W3 = #shard{node='node3'},
     ?_assertMatch(
-        {ok, #state{reply_count = 1, workers = [w2, w3]}},
-        handle_message({ok, [foo1(), bar1()]}, w1, state0(all, false))
+        {ok, #state{reply_count = 1, workers = [W2, W3]}},
+        handle_message({ok, [foo1(), bar1()]}, W1, state0(all, false))
     ).
 
 
 check_finish_quorum() ->
     % Two messages with the same revisions means we're done
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         Expect = {stop, [bar1(), foo1()]},
-        ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, w2, S1))
+        ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, W2, S1))
     end).
 
 
@@ -363,11 +389,13 @@ check_finish_quorum_newer() ->
     % foo1 should count for foo2 which means we're finished.
     % We also validate that read_repair was triggered.
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         Expect = {stop, [bar1(), foo2()]},
         ok = meck:reset(fabric),
-        ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, w2, S1)),
+        ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, W2, S1)),
         ok = meck:wait(fabric, update_docs, '_', 5000),
         ?assertMatch(
             [{_, {fabric, update_docs, [_, _, _]}, _}],
@@ -380,11 +408,14 @@ check_no_quorum_on_second() ->
     % Quorum not yet met for the foo revision so we
     % would wait for w3
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         ?assertMatch(
-            {ok, #state{workers = [w3]}},
-            handle_message({ok, [bar1()]}, w2, S1)
+            {ok, #state{workers = [W3]}},
+            handle_message({ok, [bar1()]}, W2, S1)
         )
     end).
 
@@ -394,11 +425,14 @@ check_done_on_third() ->
     % what. Every revision seen in this pattern should be
     % included.
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
-        {ok, S2} = handle_message({ok, [bar1()]}, w2, S1),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
+        {ok, S2} = handle_message({ok, [bar1()]}, W2, S1),
         Expect = {stop, [bar1(), foo1()]},
-        ?assertEqual(Expect, handle_message({ok, [bar1()]}, w3, S2))
+        ?assertEqual(Expect, handle_message({ok, [bar1()]}, W3, S2))
     end).
 
 
@@ -407,108 +441,128 @@ check_done_on_third() ->
 
 check_specific_revs_first_msg() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), false),
         ?assertMatch(
-            {ok, #state{reply_count = 1, workers = [w2, w3]}},
-            handle_message({ok, [foo1(), bar1(), bazNF()]}, w1, S0)
+            {ok, #state{reply_count = 1, workers = [W2, W3]}},
+            handle_message({ok, [foo1(), bar1(), bazNF()]}, W1, S0)
         )
     end).
 
 
 check_revs_done_on_agreement() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), false),
         Msg = {ok, [foo1(), bar1(), bazNF()]},
-        {ok, S1} = handle_message(Msg, w1, S0),
+        {ok, S1} = handle_message(Msg, W1, S0),
         Expect = {stop, [bar1(), foo1(), bazNF()]},
-        ?assertEqual(Expect, handle_message(Msg, w2, S1))
+        ?assertEqual(Expect, handle_message(Msg, W2, S1))
     end).
 
 
 check_latest_true() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo2(), bar1(), bazNF()]},
         Msg2 = {ok, [foo2(), bar1(), bazNF()]},
-        {ok, S1} = handle_message(Msg1, w1, S0),
+        {ok, S1} = handle_message(Msg1, W1, S0),
         Expect = {stop, [bar1(), foo2(), bazNF()]},
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1))
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1))
     end).
 
 
 check_ancestor_counted_in_quorum() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), bazNF()]},
         Msg2 = {ok, [foo2(), bar1(), bazNF()]},
         Expect = {stop, [bar1(), foo2(), bazNF()]},
 
         % Older first
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
 
         % Newer first
-        {ok, S2} = handle_message(Msg2, w2, S0),
-        ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+        {ok, S2} = handle_message(Msg2, W2, S0),
+        ?assertEqual(Expect, handle_message(Msg1, W1, S2))
     end).
 
 
 check_not_found_counts_for_descendant() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), bazNF()]},
         Msg2 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
         % not_found first
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
 
         % not_found second
-        {ok, S2} = handle_message(Msg2, w2, S0),
-        ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+        {ok, S2} = handle_message(Msg2, W2, S0),
+        ?assertEqual(Expect, handle_message(Msg1, W1, S2))
     end).
 
 
 check_worker_error_skipped() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), baz1()]},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
     end).
 
 
 check_quorum_only_counts_valid_responses() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {rexi_EXIT, reason},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
     end).
 
 
 check_empty_list_when_no_workers_reply() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {rexi_EXIT, reason},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {rexi_DOWN, nodedown, {nil, node()}, nil},
         Expect = {stop, all_workers_died},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
     end).
 
 
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 6e2c05f..2c4d5f4 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -344,48 +344,53 @@ update_docs_read_repair(DbName, DocsByNode, Options) ->
 
 % given [{Node, Doc}] diff revs of the same DocID from diff nodes
 % returns [Doc] filtering out purged docs.
-% This is done for read-repair from fabric_doc_open,
+% This is done for read-repair from fabric_doc_open or fabric_doc_open_revs,
 % so that not to recreate Docs that have been purged before
 % on this node() from Nodes that are out of sync.
 filter_purged_revs(Db, DocsByNode) ->
-    AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100),
-    {ok, DbPSeq} = couch_db:get_purge_seq(Db),
-    PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) ->  [{Id, Revs}|Acc]  end,
+    % go through _local/purge-mem3-.. docs
+    % and assemble NodePSeqs = [{Node1, NodePSeq1}, ...]
+    % NodePSeq1 - purge_seq of this node known to Node1
     V = "v" ++ config:get("purge", "version", "1") ++ "-",
     StartKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++  V ++ "mem3-"),
     EndKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ V ++ "mem31"),
     Opts = [{start_key, StartKey}, {end_key_gt, EndKey}],
-    % go through _local/purge-mem3-.. docs
-    % find Node that this LDoc corresponds to
-    % check if update from Node has not been recently purged on current node
     LDocsFoldFun = fun(#doc{body={Props}}, Acc) ->
         {VOps} = couch_util:get_value(<<"verify_options">>, Props),
         Node = couch_util:get_value(<<"node">>, VOps),
-        Result = lists:keyfind(Node, 1, DocsByNode),
-        NewAcc = if not Result -> Acc; true ->
-            {Node, Doc} = Result,
-            NodePSeq = couch_util:get_value(<<"purge_seq">>, Props),
-            if  NodePSeq == DbPSeq ->
-                    [Doc|Acc];
-                (NodePSeq+AllowedPSeqLag) < DbPSeq ->
-                    % Node is very out of sync, ignore updates from it
-                    Acc;
-                true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq
-                    % if Doc has been purged recently, than ignore it
-                    {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db,
-                            NodePSeq, PurgeFoldFun, [], []),
-                    {Start, [FirstRevId|_]} = Doc#doc.revs,
-                    DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]},
-                    case lists:member(DocIdRevs, PurgedIdsRevs) of
-                        true -> Acc;
-                        false -> [Doc|Acc]
-                    end
-            end
-        end,
-        {ok, NewAcc}
+        NodePSeq = couch_util:get_value(<<"purge_seq">>, Props),
+        {ok, [{Node, NodePSeq} | Acc]}
     end,
-    {ok, Docs} = couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts),
-    Docs.
+    {ok, NodePSeqs} =
+        couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts),
+
+    % go through all doc_updates and
+    % filter out updates from nodes that are behind in purges synchronization
+    AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100),
+    {ok, DbPSeq} = couch_db:get_purge_seq(Db),
+    PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) ->  [{Id, Revs}|Acc]  end,
+    lists:foldl(fun({Node, Doc}, Docs) ->
+        NodePSeq = case lists:keyfind(Node, 1, NodePSeqs) of
+           {Node, NodePSeq0} -> NodePSeq0;
+           false -> 0
+        end,
+        if  NodePSeq == DbPSeq ->
+            [Doc|Docs];
+        (NodePSeq+AllowedPSeqLag) < DbPSeq ->
+            % Node is very out of sync, ignore updates from it
+            Docs;
+        true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq
+            % if Doc has been purged recently -> ignore it
+            {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db,
+                    NodePSeq, PurgeFoldFun, [], []),
+            {Start, [FirstRevId|_]} = Doc#doc.revs,
+            DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]},
+            case lists:member(DocIdRevs, PurgedIdsRevs) of
+                true -> Docs;
+                false -> [Doc|Docs]
+            end
+        end
+    end, [], DocsByNode).
 
 
 get_or_create_db(DbName, Options) ->

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.