You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/21 18:26:20 UTC

[couchdb] 02/07: Implement clustered purge API

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 509548f0c6f3525a560d356af40d03057318c2c6
Author: Mayya Sharipova <ma...@ca.ibm.com>
AuthorDate: Mon May 1 15:43:43 2017 -0400

    Implement clustered purge API
    
    * Implement clustered endpoint to purge docs on all nodes
     - implement fabric:purge_docs(DbName, IdsRevs, Options)
     - generate a unique ID: "UUID" for every purge request before
        sending them to workers on specific node.
     - fabric_rpc:purge_docs has an option of "replicated_changes" or
        or "interactive edit" that are passed to couch_db:purge_docs/3.
        This done so that "replicated_changes" updates will not reapply
        purges with UUIDs that already exist on a node.
    
    * Fix read-repair
     - fix read-repair so not to recreate docs that have been purged before
        on a certain node from nodes that are out of sync.
     - In the open docs calls we track which nodes sent which revisions. If we
        detect the need for read repair, we send this list of  (node, rev) pairs
         as an option to the fabric:update_docs call. When fabric:update_docs
        receives a list of (node, rev) pairs it will use this information to know
         whether it should apply the update or ignore it. It checks the
        _local/purge-mem3.. docs to see if the purge_seq is up to date.
        If not it should ignore the update request. As an optimization,
        if the purge_seq is less than a configurable limit
        out of sync,  the updater sequentially scans the purge_seq tree
        looking for purge requests for the given revision and if not found can
        continue with the write.
    
    * Implement clustered endpoint to set purged_docs_limit of Db on all nodes
     - implement fabric:set_purged_docs_limit(DbName, Limit, Options)
    
    * Implement clustered endpoint to get purged_docs_limit
     - implement fabric:get_purged_docs_limit(DbName)
    
    COUCHDB-3326
---
 src/fabric/src/fabric.erl           |  35 ++-
 src/fabric/src/fabric_db_info.erl   |  29 +--
 src/fabric/src/fabric_db_meta.erl   |  26 ++-
 src/fabric/src/fabric_doc_open.erl  |  42 ++--
 src/fabric/src/fabric_doc_purge.erl | 414 ++++++++++++++++++++++++++++++++++++
 src/fabric/src/fabric_rpc.erl       | 102 ++++++++-
 6 files changed, 612 insertions(+), 36 deletions(-)

diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 4a07271..7221654 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -21,12 +21,13 @@
     delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
     set_security/2, set_security/3, get_revs_limit/1, get_security/1,
     get_security/2, get_all_security/1, get_all_security/2,
+    get_purged_docs_limit/1, set_purged_docs_limit/3,
     compact/1, compact/2]).
 
 % Documents
 -export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3,
     get_missing_revs/2, get_missing_revs/3, update_doc/3, update_docs/3,
-    purge_docs/2, att_receiver/2]).
+    purge_docs/3, att_receiver/2]).
 
 % Views
 -export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4,
@@ -137,6 +138,18 @@ set_security(DbName, SecObj) ->
 set_security(DbName, SecObj, Options) ->
     fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)).
 
+%% @doc sets the upper bound for the number of stored purge requests
+-spec set_purged_docs_limit(dbname(), pos_integer(), [option()]) -> ok.
+set_purged_docs_limit(DbName, Limit, Options)
+        when is_integer(Limit), Limit > 0 ->
+    fabric_db_meta:set_purged_docs_limit(dbname(DbName), Limit, opts(Options)).
+
+%% @doc retrieves the upper bound for the number of stored purge requests
+-spec get_purged_docs_limit(dbname()) -> pos_integer() | no_return().
+get_purged_docs_limit(DbName) ->
+    {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
+    try couch_db:get_purged_docs_limit(Db) after catch couch_db:close(Db) end.
+
 get_security(DbName) ->
     get_security(DbName, [?ADMIN_CTX]).
 
@@ -267,8 +280,24 @@ update_docs(DbName, Docs, Options) ->
         {aborted, PreCommitFailures}
     end.
 
-purge_docs(_DbName, _IdsRevs) ->
-    not_implemented.
+
+%% @doc purge revisions for a list '{Id, Revs}'
+%%      returns {ok, {PurgeSeq, Results}}
+-spec purge_docs(dbname(), [{docid(), [revision()]}], [option()]) ->
+    {Health, {PurgeSeq, [{Health, [revision()]}] }} when
+    Health     :: ok | accepted,
+    PurgeSeq   :: any().
+purge_docs(DbName, IdsRevs, Options) when is_list(IdsRevs) ->
+    IdsRevs2 = [idrevs(IdRs) || IdRs <- IdsRevs],
+    case fabric_doc_purge:go(dbname(DbName), IdsRevs2, opts(Options)) of
+        {ok, Results} ->
+            {ok, Results};
+        {accepted, Results} ->
+            {accepted, Results};
+        Error ->
+            throw(Error)
+    end.
+
 
 %% @doc spawns a process to upload attachment data and
 %%      returns a function that shards can use to communicate
diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl
index 98e8e52..97a31c2 100644
--- a/src/fabric/src/fabric_db_info.erl
+++ b/src/fabric/src/fabric_db_info.erl
@@ -23,10 +23,12 @@ go(DbName) ->
     RexiMon = fabric_util:create_monitors(Shards),
     Fun = fun handle_message/3,
     {ok, ClusterInfo} = get_cluster_info(Shards),
-    Acc0 = {fabric_dict:init(Workers, nil), [{cluster, ClusterInfo}]},
+    Acc0 = {fabric_dict:init(Workers, nil), [], [{cluster, ClusterInfo}]},
     try
         case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
-            {ok, Acc} -> {ok, Acc};
+
+            {ok, Acc} ->
+                {ok, Acc};
             {timeout, {WorkersDict, _}} ->
                 DefunctWorkers = fabric_util:remove_done_workers(
                     WorkersDict,
@@ -37,44 +39,49 @@ go(DbName) ->
                     "get_db_info"
                 ),
                 {error, timeout};
-            {error, Error} -> throw(Error)
+            {error, Error} ->
+                throw(Error)
         end
     after
         rexi_monitor:stop(RexiMon)
     end.
 
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
+handle_message({rexi_DOWN,
+        _, {_,NodeRef},_}, _Shard, {Counters, PseqAcc, Acc}) ->
     case fabric_util:remove_down_workers(Counters, NodeRef) of
     {ok, NewCounters} ->
-        {ok, {NewCounters, Acc}};
+        {ok, {NewCounters, PseqAcc, Acc}};
     error ->
         {error, {nodedown, <<"progress not possible">>}}
     end;
 
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, PseqAcc, Acc}) ->
     NewCounters = fabric_dict:erase(Shard, Counters),
     case fabric_view:is_progress_possible(NewCounters) of
     true ->
-        {ok, {NewCounters, Acc}};
+        {ok, {NewCounters, PseqAcc, Acc}};
     false ->
         {error, Reason}
     end;
 
-handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
+handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, PseqAcc, Acc}) ->
     case fabric_dict:lookup_element(Shard, Counters) of
     undefined ->
         % already heard from someone else in this range
-        {ok, {Counters, Acc}};
+        {ok, {Counters, PseqAcc, Acc}};
     nil ->
         Seq = couch_util:get_value(update_seq, Info),
         C1 = fabric_dict:store(Shard, Seq, Counters),
         C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+        PSeq = couch_util:get_value(purge_seq, Info),
+        NewPseqAcc = [{Shard, PSeq}|PseqAcc],
         case fabric_dict:any(nil, C2) of
         true ->
-            {ok, {C2, [Info|Acc]}};
+            {ok, {C2, NewPseqAcc, [Info|Acc]}};
         false ->
             {stop, [
                 {db_name,Name},
+                {purge_seq, fabric_view_changes:pack_seqs(NewPseqAcc)},
                 {update_seq, fabric_view_changes:pack_seqs(C2)} |
                 merge_results(lists:flatten([Info|Acc]))
             ]}
@@ -91,8 +98,6 @@ merge_results(Info) ->
             [{doc_count, lists:sum(X)} | Acc];
         (doc_del_count, X, Acc) ->
             [{doc_del_count, lists:sum(X)} | Acc];
-        (purge_seq, X, Acc) ->
-            [{purge_seq, lists:sum(X)} | Acc];
         (compact_running, X, Acc) ->
             [{compact_running, lists:member(true, X)} | Acc];
         (disk_size, X, Acc) -> % legacy
diff --git a/src/fabric/src/fabric_db_meta.erl b/src/fabric/src/fabric_db_meta.erl
index 367ef06..4fd9365 100644
--- a/src/fabric/src/fabric_db_meta.erl
+++ b/src/fabric/src/fabric_db_meta.erl
@@ -12,7 +12,8 @@
 
 -module(fabric_db_meta).
 
--export([set_revs_limit/3, set_security/3, get_all_security/2]).
+-export([set_revs_limit/3, set_security/3, get_all_security/2,
+    set_purged_docs_limit/3]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -48,6 +49,29 @@ handle_revs_message(Error, _, _Acc) ->
     {error, Error}.
 
 
+set_purged_docs_limit(DbName, Limit, Options) ->
+    Shards = mem3:shards(DbName),
+    Workers = fabric_util:submit_jobs(Shards, set_purged_docs_limit, [Limit, Options]),
+    Handler = fun handle_purge_message/3,
+    Acc0 = {Workers, length(Workers) - 1},
+    case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of
+        {ok, ok} ->
+            ok;
+        {timeout, {DefunctWorkers, _}} ->
+            fabric_util:log_timeout(DefunctWorkers, "set_purged_docs_limit"),
+            {error, timeout};
+        Error ->
+            Error
+    end.
+
+handle_purge_message(ok, _, {_Workers, 0}) ->
+    {stop, ok};
+handle_purge_message(ok, Worker, {Workers, Waiting}) ->
+    {ok, {lists:delete(Worker, Workers), Waiting - 1}};
+handle_purge_message(Error, _, _Acc) ->
+    {error, Error}.
+
+
 set_security(DbName, SecObj, Options) ->
     Shards = mem3:shards(DbName),
     RexiMon = fabric_util:create_monitors(Shards),
diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl
index 9c45bd9..b974880 100644
--- a/src/fabric/src/fabric_doc_open.erl
+++ b/src/fabric/src/fabric_doc_open.erl
@@ -25,7 +25,8 @@
     r,
     state,
     replies,
-    q_reply
+    q_reply,
+    replies_by_node=[] %[{Node, Reply}] used for checking if a doc is purged
 }).
 
 
@@ -83,7 +84,8 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
     end;
 handle_message(Reply, Worker, Acc) ->
     NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
-    NewAcc = Acc#acc{replies = NewReplies},
+    NewNReplies = [{Worker#shard.node, Reply}|Acc#acc.replies_by_node],
+    NewAcc = Acc#acc{replies = NewReplies, replies_by_node = NewNReplies},
     case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
     {true, QuorumReply} ->
         fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
@@ -122,14 +124,15 @@ is_r_met(Workers, Replies, R) ->
         no_more_workers
     end.
 
-read_repair(#acc{dbname=DbName, replies=Replies}) ->
+read_repair(#acc{dbname=DbName, replies=Replies, replies_by_node=NReplies0}) ->
     Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
+    NReplies = [{Node, Doc} || {Node, {ok, #doc{}=Doc}} <- NReplies0],
     case Docs of
     % omit local docs from read repair
     [#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
         choose_reply(Docs);
     [#doc{id=Id} | _] ->
-        Opts = [replicated_changes, ?ADMIN_CTX],
+        Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NReplies}],
         Res = fabric:update_docs(DbName, Docs, Opts),
         case Res of
             {ok, []} ->
@@ -319,7 +322,8 @@ handle_message_reply_test() ->
     ?assertEqual(
         {ok, Acc0#acc{
             workers=[Worker0, Worker1],
-            replies=[fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(foo,1)],
+            replies_by_node=[{undefined, foo}]
         }},
         handle_message(foo, Worker2, Acc0)
     ),
@@ -327,7 +331,8 @@ handle_message_reply_test() ->
     ?assertEqual(
         {ok, Acc0#acc{
             workers=[Worker0, Worker1],
-            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
+            replies_by_node=[{undefined, bar}]
         }},
         handle_message(bar, Worker2, Acc0#acc{
             replies=[fabric_util:kv(foo,1)]
@@ -339,18 +344,21 @@ handle_message_reply_test() ->
     % is returned. Bit subtle on the assertions here.
 
     ?assertEqual(
-        {stop, Acc0#acc{workers=[],replies=[fabric_util:kv(foo,1)]}},
+        {stop, Acc0#acc{workers=[],replies=[fabric_util:kv(foo,1)],
+            replies_by_node=[{undefined, foo}]}},
         handle_message(foo, Worker0, Acc0#acc{workers=[Worker0]})
     ),
 
     ?assertEqual(
         {stop, Acc0#acc{
             workers=[],
-            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
+            replies_by_node =[{undefined, bar}, {undefined, foo}]
         }},
         handle_message(bar, Worker0, Acc0#acc{
             workers=[Worker0],
-            replies=[fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(foo,1)],
+            replies_by_node=[{undefined, foo}]
         })
     ),
 
@@ -362,11 +370,13 @@ handle_message_reply_test() ->
             workers=[],
             replies=[fabric_util:kv(foo,2)],
             state=r_met,
-            q_reply=foo
+            q_reply=foo,
+            replies_by_node =[{undefined, foo}, {undefined, foo}]
         }},
         handle_message(foo, Worker1, Acc0#acc{
             workers=[Worker0, Worker1],
-            replies=[fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(foo,1)],
+            replies_by_node =[{undefined, foo}]
         })
     ),
 
@@ -376,7 +386,8 @@ handle_message_reply_test() ->
             r=1,
             replies=[fabric_util:kv(foo,1)],
             state=r_met,
-            q_reply=foo
+            q_reply=foo,
+            replies_by_node =[{undefined, foo}]
         }},
         handle_message(foo, Worker0, Acc0#acc{r=1})
     ),
@@ -386,11 +397,14 @@ handle_message_reply_test() ->
             workers=[],
             replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,2)],
             state=r_met,
-            q_reply=foo
+            q_reply=foo,
+            replies_by_node =[{undefined, foo}, {undefined, foo},
+                {undefined, bar}]
         }},
         handle_message(foo, Worker0, Acc0#acc{
             workers=[Worker0],
-            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
+            replies_by_node =[{undefined, foo}, {undefined, bar}]
         })
     ),
 
diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl
new file mode 100644
index 0000000..24e8c66
--- /dev/null
+++ b/src/fabric/src/fabric_doc_purge.erl
@@ -0,0 +1,414 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_purge).
+
+-export([go/3]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(_, [], _) ->
+    {ok, []};
+go(DbName, AllIdsRevs, Opts) ->
+    % tag each purge request with UUId
+    {AllUUIDs, AllUUIDsIdsRevs, DocCount} = tag_docs(AllIdsRevs),
+
+    Options = lists:delete(all_or_nothing, Opts),
+    % Counters -> [{Worker, UUIDs}]
+    {Counters, Workers} = dict:fold(fun(Shard, UUIDsIdsRevs, {Cs,Ws}) ->
+        UUIDs = [UUID || {UUID, _Id, _Revs} <-UUIDsIdsRevs],
+        #shard{name=Name, node=Node} = Shard,
+        Ref = rexi:cast(Node,
+            {fabric_rpc, purge_docs, [Name, UUIDsIdsRevs, Options]}),
+        Worker = Shard#shard{ref=Ref},
+        {[{Worker, UUIDs}|Cs], [Worker|Ws]}
+    end, {[], []}, group_idrevs_by_shard(DbName, AllUUIDsIdsRevs)),
+
+    RexiMon = fabric_util:create_monitors(Workers),
+    W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
+    Acc = {length(Workers), DocCount, list_to_integer(W), Counters, dict:new()},
+    Timeout = fabric_util:request_timeout(),
+    try rexi_utils:recv(Workers, #shard.ref,
+        fun handle_message/3, Acc, infinity, Timeout) of
+    {ok, {Health, Results}} when Health =:= ok; Health =:= accepted ->
+        % Results-> [{UUID, {ok, Revs}}]
+        {Health, [R || R <-
+            couch_util:reorder_results(AllUUIDs, Results)]};
+    {timeout, Acc1} ->
+        {_, _, W1, Counters1, DocReplDict0} = Acc1,
+        {DefunctWorkers, _} = lists:unzip(Counters1),
+        fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
+        DocReplDict = lists:foldl(fun({_W, Docs}, Dict) ->
+            Replies = [{error, timeout} || _D <- Docs],
+            append_purge_replies(Docs, Replies, Dict)
+        end, DocReplDict0, Counters1),
+        {Health, _, Resp} = dict:fold(
+            fun force_reply/3, {ok, W1, []}, DocReplDict),
+        case Health of
+            error -> timeout;
+            _ -> {Health, [R || R <-
+                couch_util:reorder_results(AllUUIDs, Resp)]}
+
+        end;
+    Else ->
+        Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) ->
+    {_, DocCount, W, Counters, DocsDict0} = Acc0,
+    {FailCounters, NewCounters} = lists:partition(fun({#shard{node=N}, _}) ->
+        N == NodeRef
+    end, Counters),
+    % fill DocsDict with error messages for relevant Docs
+    DocsDict = lists:foldl(fun({_W, Docs}, CDocsDict) ->
+        Replies = [{error, internal_server_error} || _D <- Docs],
+        append_purge_replies(Docs, Replies, CDocsDict)
+    end, DocsDict0, FailCounters),
+    skip_message({length(NewCounters), DocCount, W, NewCounters, DocsDict});
+handle_message({rexi_EXIT, _}, Worker, Acc0) ->
+    {WC, DocCount , W, Counters, DocsDict0} = Acc0,
+    % fill DocsDict with error messages for relevant Docs
+    {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
+    Replies = [{error, internal_server_error} || _D <- Docs],
+    DocsDict = append_purge_replies(Docs, Replies, DocsDict0),
+    skip_message({WC-1, DocCount, W, NewCounters, DocsDict});
+handle_message({ok, Replies0}, Worker, Acc0) ->
+    {WCount, DocCount, W, Counters, DocsDict0} = Acc0,
+    {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
+    DocsDict = append_purge_replies(Docs, Replies0, DocsDict0),
+    case {WCount, dict:size(DocsDict)} of
+    {1, _} ->
+        % last message has arrived, we need to conclude things
+        {Health, W, Replies} = dict:fold(fun force_reply/3, {ok, W, []},
+           DocsDict),
+        {stop, {Health, Replies}};
+    {_, DocCount} ->
+        % we've got at least one reply for each document, let's take a look
+        case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocsDict) of
+        continue ->
+            {ok, {WCount - 1, DocCount, W, NewCounters, DocsDict}};
+        {stop, W, Replies} ->
+            {stop, {ok, Replies}}
+        end;
+    _ ->
+        {ok, {WCount - 1, DocCount, W, NewCounters, DocsDict}}
+    end;
+handle_message({error, purged_docs_limit_exceeded}=Error, Worker, Acc0) ->
+    {WC, DocCount , W, Counters, DocsDict0} = Acc0,
+    % fill DocsDict with error messages for relevant Docs
+    {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
+    Replies = [Error || _D <- Docs],
+    DocsDict = append_purge_replies(Docs, Replies, DocsDict0),
+    skip_message({WC-1, DocCount, W, NewCounters, DocsDict});
+handle_message({bad_request, Msg}, _, _) ->
+    throw({bad_request, Msg}).
+
+
+tag_docs(AllIdsRevs) ->
+    {UUIDs, UUIDsIdsRevs, DocCount} = lists:foldl(fun(
+        {Id, Revs}, {UAcc, UIRAcc, C}) ->
+        UUID = couch_uuids:new(),
+        {[UUID|UAcc], [{UUID, Id, Revs}|UIRAcc], C+1}
+    end, {[], [], 0}, AllIdsRevs),
+    {lists:reverse(UUIDs), lists:reverse(UUIDsIdsRevs), DocCount}.
+
+
+force_reply(Doc, Replies, {Health, W, Acc}) ->
+    case update_quorum_met(W, Replies) of
+    {true, FinalReply} ->
+        {Health, W, [{Doc, FinalReply} | Acc]};
+    false ->
+        case [Reply || {ok, Reply} <- Replies] of
+        [] ->
+            UReplies = lists:usort(Replies),
+            case UReplies of
+                [{error, internal_server_error}] ->
+                    {error, W, [{Doc, {error, internal_server_error}} | Acc]};
+                [{error, timeout}] ->
+                    {error, W, [{Doc, {error, timeout}} | Acc]};
+                [FirstReply|[]] ->
+                    % check if all errors are identical, if so inherit health
+                    {Health, W, [{Doc, FirstReply} | Acc]};
+                _ ->
+                    {error, W, [{Doc, UReplies} | Acc]}
+             end;
+        AcceptedReplies0 ->
+            NewHealth = case Health of ok -> accepted; _ -> Health end,
+            AcceptedReplies = lists:usort(lists:flatten(AcceptedReplies0)),
+            {NewHealth, W, [{Doc, {accepted, AcceptedReplies}} | Acc]}
+        end
+    end.
+
+
+maybe_reply(_, _, continue) ->
+    % we didn't meet quorum for all docs, so we're fast-forwarding the fold
+    continue;
+maybe_reply(Doc, Replies, {stop, W, Acc}) ->
+    case update_quorum_met(W, Replies) of
+    {true, Reply} ->
+        {stop, W, [{Doc, Reply} | Acc]};
+    false ->
+        continue
+    end.
+
+update_quorum_met(W, Replies) ->
+    OkReplies = lists:foldl(fun(Reply, PrevsAcc) ->
+        case Reply of
+            {ok, PurgedRevs} -> [PurgedRevs | PrevsAcc];
+            _ -> PrevsAcc
+        end
+    end, [], Replies),
+    if length(OkReplies) < W -> false; true ->
+        % make a union of PurgedRevs
+        FinalReply = {ok, lists:usort(lists:flatten(OkReplies))},
+        {true, FinalReply}
+    end.
+
+
+group_idrevs_by_shard(DbName, UUIDsIdsRevs) ->
+    lists:foldl(fun({_UUID, Id, _Revs} = UUIDIdRevs, D0) ->
+        lists:foldl(fun(Shard, D1) ->
+            dict:append(Shard, UUIDIdRevs, D1)
+        end, D0, mem3:shards(DbName, Id))
+    end, dict:new(), UUIDsIdsRevs).
+
+
+append_purge_replies([], [], DocReplyDict) ->
+    DocReplyDict;
+append_purge_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
+    append_purge_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
+
+
+skip_message({0, _, W, _, DocsDict}) ->
+    {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocsDict),
+    {stop, {Health, Reply}};
+skip_message(Acc0) ->
+    {ok, Acc0}.
+
+
+% eunits
+doc_purge_ok_test() ->
+    meck:new(couch_log),
+    meck:expect(couch_log, warning, fun(_,_) -> ok end),
+    meck:expect(couch_log, notice, fun(_,_) -> ok end),
+
+    Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+    UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+    Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+    UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+    UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+    Shards =
+        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+    Counters = dict:to_list(
+        group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+    DocsDict = dict:new(),
+
+    % ***test for W = 2
+    AccW2 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+        Counters, DocsDict},
+    {ok, {WaitingCountW2_1,_,_,_,_} = AccW2_1} =
+        handle_message({ok,[{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW2),
+    ?assertEqual(2, WaitingCountW2_1),
+    {stop, FinalReplyW2 } =
+        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+            lists:nth(2,Shards), AccW2_1),
+    ?assertEqual(
+        {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
+        FinalReplyW2
+    ),
+
+    % ***test for W = 3
+    AccW3 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+        Counters, DocsDict},
+    {ok, {WaitingCountW3_1,_,_,_,_} = AccW3_1} =
+        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW3),
+    ?assertEqual(2, WaitingCountW3_1),
+    {ok, {WaitingCountW3_2,_,_,_,_} = AccW3_2} =
+        handle_message({ok,[{ok, Revs1}, {ok, Revs2}]},
+            lists:nth(2,Shards), AccW3_1),
+    ?assertEqual(1, WaitingCountW3_2),
+    {stop, FinalReplyW3 } =
+        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+            lists:nth(3,Shards), AccW3_2),
+    ?assertEqual(
+        {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
+        FinalReplyW3
+    ),
+
+    % *** test rexi_exit on 1 node
+    Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+        Counters, DocsDict},
+    {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
+    ?assertEqual(2, WaitingCount1),
+    {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+        handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+    ?assertEqual(1, WaitingCount2),
+    {stop, Reply} =
+        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+            lists:nth(3,Shards), Acc2),
+    ?assertEqual(
+        {ok,[{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
+        Reply
+    ),
+
+    % *** test {error, purge_during_compaction_exceeded_limit} on all nodes
+    % *** still should return ok reply for the request
+    ErrPDCEL = {error, purge_during_compaction_exceeded_limit},
+    Acc20 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+        Counters, DocsDict},
+    {ok, {WaitingCount21,_,_,_,_} = Acc21} =
+        handle_message({ok, [ErrPDCEL, ErrPDCEL]}, hd(Shards), Acc20),
+    ?assertEqual(2, WaitingCount21),
+    {ok, {WaitingCount22,_,_,_,_} = Acc22} =
+        handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(2,Shards), Acc21),
+    ?assertEqual(1, WaitingCount22),
+    {stop, Reply2 } =
+        handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(3,Shards), Acc22),
+    ?assertEqual(
+        {ok, [{UUID1, ErrPDCEL}, {UUID2, ErrPDCEL}]},
+        Reply2
+    ),
+
+    % *** test {error, purged_docs_limit_exceeded} on all nodes
+    % *** still should return ok reply for the request
+    ErrPDLE = {error, purged_docs_limit_exceeded},
+    Acc30 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+        Counters, DocsDict},
+    {ok, {WaitingCount31,_,_,_,_} = Acc31} =
+        handle_message({ok, [ErrPDLE, ErrPDLE]}, hd(Shards), Acc30),
+    ?assertEqual(2, WaitingCount31),
+    {ok, {WaitingCount32,_,_,_,_} = Acc32} =
+        handle_message({ok, [ErrPDLE, ErrPDLE]}, lists:nth(2,Shards), Acc31),
+    ?assertEqual(1, WaitingCount32),
+    {stop, Reply3 } =
+        handle_message({ok, [ErrPDLE, ErrPDLE]},lists:nth(3,Shards), Acc32),
+    ?assertEqual(
+        {ok, [{UUID1, ErrPDLE}, {UUID2, ErrPDLE}]},
+        Reply3
+    ),
+    meck:unload(couch_log).
+
+
+doc_purge_accepted_test() ->
+    meck:new(couch_log),
+    meck:expect(couch_log, warning, fun(_,_) -> ok end),
+    meck:expect(couch_log, notice, fun(_,_) -> ok end),
+
+    Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+    UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+    Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+    UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+    UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+    Shards =
+        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+    Counters = dict:to_list(
+        group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+    DocsDict = dict:new(),
+
+    % *** test rexi_exit on 2 nodes
+    Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+        Counters, DocsDict},
+    {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+        handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
+    ?assertEqual(2, WaitingCount1),
+    {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+        handle_message({rexi_EXIT, nil}, lists:nth(2, Shards), Acc1),
+    ?assertEqual(1, WaitingCount2),
+    {stop, Reply} =
+        handle_message({rexi_EXIT, nil}, lists:nth(3, Shards), Acc2),
+    ?assertEqual(
+        {accepted, [{UUID1, {accepted, Revs1}}, {UUID2, {accepted, Revs2}}]},
+        Reply
+    ),
+    meck:unload(couch_log).
+
+
+doc_purge_error_test() ->
+    meck:new(couch_log),
+    meck:expect(couch_log, warning, fun(_,_) -> ok end),
+    meck:expect(couch_log, notice, fun(_,_) -> ok end),
+
+    Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+    UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+    Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+    UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+    UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+    Shards =
+        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+    Counters = dict:to_list(
+        group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+    DocsDict = dict:new(),
+
+    % *** test rexi_exit on all 3 nodes
+    Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+        Counters, DocsDict},
+    {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+        handle_message({rexi_EXIT, nil}, hd(Shards), Acc0),
+    ?assertEqual(2, WaitingCount1),
+    {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+        handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+    ?assertEqual(1, WaitingCount2),
+    {stop, Reply} =
+        handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
+    ?assertEqual(
+        {error, [{UUID1, {error, internal_server_error}},
+            {UUID2, {error, internal_server_error}}]},
+        Reply
+    ),
+
+    % ***test w quorum > # shards, which should fail immediately
+    Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
+    Counters2 = dict:to_list(
+        group_idrevs_by_shard_hack(<<"foo">>, Shards2, UUIDsIDdsRevs)),
+    AccW4 = {length(Shards2), length(UUIDsIDdsRevs), list_to_integer("2"),
+        Counters2, DocsDict},
+    Bool =
+        case handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
+                hd(Shards), AccW4) of
+            {stop, _Reply} ->
+                true;
+            _ -> false
+        end,
+    ?assertEqual(true, Bool),
+
+    % *** test Docs with no replies should end up as {error, internal_server_error}
+    SA1 = #shard{node = a, range = [1]},
+    SA2 = #shard{node = a, range = [2]},
+    SB1 = #shard{node = b, range = [1]},
+    SB2 = #shard{node = b, range = [2]},
+    Counters3 = [{SA1,[UUID1]}, {SB1,[UUID1]},
+        {SA2,[UUID2]}, {SB2,[UUID2]}],
+    Acc30 = {length(Counters3), length(UUIDsIDdsRevs), 2, Counters3, DocsDict},
+    {ok, Acc31} = handle_message({ok, [{ok, Revs1}]}, SA1, Acc30),
+    {ok, Acc32} = handle_message({rexi_EXIT, nil}, SB1, Acc31),
+    {ok, Acc33} = handle_message({rexi_EXIT, nil}, SA2, Acc32),
+    {stop, Acc34} = handle_message({rexi_EXIT, nil}, SB2, Acc33),
+    ?assertEqual(
+        {error, [{UUID1, {accepted, Revs1}},
+            {UUID2, {error, internal_server_error}}]},
+        Acc34
+    ),
+    meck:unload(couch_log).
+
+
+% needed for testing to avoid having to start the mem3 application
+group_idrevs_by_shard_hack(_DbName, Shards, UUIDsIdsRevs) ->
+    lists:foldl(fun({UUID, _Id, _Revs}, Dict0) ->
+        lists:foldl(fun(Shard, Dict1) ->
+            dict:append(Shard, UUID, Dict1)
+        end, Dict0, Shards)
+    end, dict:new(), UUIDsIdsRevs).
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 4a69e7e..6e2c05f 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -21,6 +21,7 @@
     delete_shard_db_doc/2]).
 -export([get_all_security/2, open_shard/2]).
 -export([compact/1, compact/2]).
+-export([get_purge_seq/2, purge_docs/3, set_purged_docs_limit/3]).
 
 -export([get_db_info/2, get_doc_count/2, get_update_seq/2,
          changes/4, map_view/5, reduce_view/5, group_info/3, update_mrview/4]).
@@ -201,6 +202,9 @@ get_all_security(DbName, Options) ->
 set_revs_limit(DbName, Limit, Options) ->
     with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
 
+set_purged_docs_limit(DbName, Limit, Options) ->
+    with_db(DbName, Options, {couch_db, set_purged_docs_limit, [Limit]}).
+
 open_doc(DbName, DocId, Options) ->
     with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
 
@@ -236,14 +240,31 @@ get_missing_revs(DbName, IdRevsList, Options) ->
     end).
 
 update_docs(DbName, Docs0, Options) ->
+    X = case proplists:get_value(replicated_changes, Options) of
+        true -> replicated_changes;
+        _ -> interactive_edit
+    end,
+    DocsByNode = couch_util:get_value(read_repair, Options),
+    case {X, DocsByNode} of
+        {_, undefined} ->
+            Docs = make_att_readers(Docs0),
+            with_db(DbName, Options,
+                {couch_db, update_docs, [Docs, Options, X]});
+        {replicated_changes, _} ->
+            update_docs_read_repair(DbName, DocsByNode, Options)
+    end.
+
+get_purge_seq(DbName, Options) ->
+    with_db(DbName, Options, {couch_db, get_purge_seq, []}).
+
+purge_docs(DbName, UUIdsIdsRevs, Options) ->
     case proplists:get_value(replicated_changes, Options) of
-    true ->
-        X = replicated_changes;
-    _ ->
-        X = interactive_edit
+        true ->
+            X = replicated_changes;
+        _ ->
+            X = interactive_edit
     end,
-    Docs = make_att_readers(Docs0),
-    with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
+    with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, X]}).
 
 %% @equiv group_info(DbName, DDocId, [])
 group_info(DbName, DDocId) ->
@@ -298,6 +319,75 @@ with_db(DbName, Options, {M,F,A}) ->
         rexi:reply(Error)
     end.
 
+
+update_docs_read_repair(DbName, DocsByNode, Options) ->
+    set_io_priority(DbName, Options),
+    case get_or_create_db(DbName, Options) of
+    {ok, Db} ->
+        % omit Revisions that have been purged
+        Docs = filter_purged_revs(Db, DocsByNode),
+        Docs2 = make_att_readers(Docs),
+        {M,F,A} = {couch_db, update_docs, [Docs2, Options, replicated_changes]},
+        rexi:reply(try
+            apply(M, F, [Db | A])
+        catch Exception ->
+            Exception;
+        error:Reason ->
+            couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason,
+                clean_stack()]),
+            {error, Reason}
+        end);
+    Error ->
+        rexi:reply(Error)
+    end.
+
+
+% given [{Node, Doc}] diff revs of the same DocID from diff nodes
+% returns [Doc] filtering out purged docs.
+% This is done for read-repair from fabric_doc_open,
+% so that not to recreate Docs that have been purged before
+% on this node() from Nodes that are out of sync.
+filter_purged_revs(Db, DocsByNode) ->
+    AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100),
+    {ok, DbPSeq} = couch_db:get_purge_seq(Db),
+    PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) ->  [{Id, Revs}|Acc]  end,
+    V = "v" ++ config:get("purge", "version", "1") ++ "-",
+    StartKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++  V ++ "mem3-"),
+    EndKey = ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ V ++ "mem31"),
+    Opts = [{start_key, StartKey}, {end_key_gt, EndKey}],
+    % go through _local/purge-mem3-.. docs
+    % find Node that this LDoc corresponds to
+    % check if update from Node has not been recently purged on current node
+    LDocsFoldFun = fun(#doc{body={Props}}, Acc) ->
+        {VOps} = couch_util:get_value(<<"verify_options">>, Props),
+        Node = couch_util:get_value(<<"node">>, VOps),
+        Result = lists:keyfind(Node, 1, DocsByNode),
+        NewAcc = if not Result -> Acc; true ->
+            {Node, Doc} = Result,
+            NodePSeq = couch_util:get_value(<<"purge_seq">>, Props),
+            if  NodePSeq == DbPSeq ->
+                    [Doc|Acc];
+                (NodePSeq+AllowedPSeqLag) < DbPSeq ->
+                    % Node is very out of sync, ignore updates from it
+                    Acc;
+                true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq
+                    % if Doc has been purged recently, than ignore it
+                    {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db,
+                            NodePSeq, PurgeFoldFun, [], []),
+                    {Start, [FirstRevId|_]} = Doc#doc.revs,
+                    DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]},
+                    case lists:member(DocIdRevs, PurgedIdsRevs) of
+                        true -> Acc;
+                        false -> [Doc|Acc]
+                    end
+            end
+        end,
+        {ok, NewAcc}
+    end,
+    {ok, Docs} = couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts),
+    Docs.
+
+
 get_or_create_db(DbName, Options) ->
     couch_db:open_int(DbName, [{create_if_missing, true} | Options]).
 

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.