You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/15 19:05:55 UTC

fabric commit: updated refs/heads/COUCHDB-3326-clustered-purge to 9b6662a

Repository: couchdb-fabric
Updated Branches:
  refs/heads/COUCHDB-3326-clustered-purge [created] 9b6662aff


Implement clustered purge API

COUCHDB-3326


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/9b6662af
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/9b6662af
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/9b6662af

Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 9b6662aff94a2ea7239624c16ce5b97c1ccc088b
Parents: c4ae2f5
Author: Mayya Sharipova <ma...@ca.ibm.com>
Authored: Fri Oct 21 10:57:50 2016 -0400
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Wed Mar 15 14:05:21 2017 -0500

----------------------------------------------------------------------
 src/fabric.erl           |  34 +++-
 src/fabric_db_meta.erl   |  26 ++-
 src/fabric_doc_open.erl  |  46 +++--
 src/fabric_doc_purge.erl | 424 ++++++++++++++++++++++++++++++++++++++++++
 src/fabric_rpc.erl       | 103 +++++++++-
 5 files changed, 607 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9b6662af/src/fabric.erl
----------------------------------------------------------------------
diff --git a/src/fabric.erl b/src/fabric.erl
index c0f95df..7e0e777 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -21,12 +21,13 @@
     delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
     set_security/2, set_security/3, get_revs_limit/1, get_security/1,
     get_security/2, get_all_security/1, get_all_security/2,
+    get_purged_docs_limit/1, set_purged_docs_limit/3, get_purge_seq/1,
     compact/1, compact/2]).
 
 % Documents
 -export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3,
     get_missing_revs/2, get_missing_revs/3, update_doc/3, update_docs/3,
-    purge_docs/2, att_receiver/2]).
+    purge_docs/3, att_receiver/2]).
 
 % Views
 -export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4,
@@ -70,7 +71,8 @@ all_dbs(Prefix) when is_list(Prefix) ->
 
 %% @doc returns a property list of interesting properties
 %%      about the database such as `doc_count', `disk_size',
-%%      etc.
+%%      purge_quality 0-100 - % of converged shards that share
+%%      the largest purge_seq etc.
 -spec get_db_info(dbname()) ->
     {ok, [
         {instance_start_time, binary()} |
@@ -127,6 +129,25 @@ get_revs_limit(DbName) ->
     {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
     try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end.
 
+%% @doc sets the upper bound for the number of stored purge requests
+-spec set_purged_docs_limit(dbname(), pos_integer(), [option()]) -> ok.
+set_purged_docs_limit(DbName, Limit, Options)
+        when is_integer(Limit), Limit > 0 ->
+    fabric_db_meta:set_purged_docs_limit(dbname(DbName), Limit, opts(Options)).
+
+%% @doc retrieves the upper bound for the number of stored purge requests
+-spec get_purged_docs_limit(dbname()) -> pos_integer() | no_return().
+get_purged_docs_limit(DbName) ->
+    {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
+    try couch_db:get_purged_docs_limit(Db) after catch couch_db:close(Db) end.
+
+%% @doc retrieves the current purge_seq
+-spec get_purge_seq(dbname()) -> non_neg_integer() | no_return().
+get_purge_seq(DbName) ->
+    {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
+    try couch_db:get_purge_seq(Db) after catch couch_db:close(Db) end.
+
+
 %% @doc sets the readers/writers/admin permissions for a database
 -spec set_security(dbname(), SecObj::json_obj()) -> ok.
 set_security(DbName, SecObj) ->
@@ -267,8 +288,13 @@ update_docs(DbName, Docs, Options) ->
         {aborted, PreCommitFailures}
     end.
 
-purge_docs(_DbName, _IdsRevs) ->
-    not_implemented.
+%% @doc purge revisions for a list '{Id, Revs}'
+%%      returns {ok, {PurgeSeq, [IdRevs]}}
+-spec purge_docs(dbname(), [{docid(), [revision()]}], [option()]) ->
+    {ok, {any(), [{docid(), [any()]}] }}.
+purge_docs(DbName, IdsRevs, Options) when is_list(IdsRevs) ->
+    IdsRevs2 = [idrevs(IdRs) || IdRs <- IdsRevs],
+    fabric_doc_purge:go(dbname(DbName), IdsRevs2, opts(Options)).
 
 %% @doc spawns a process to upload attachment data and
 %%      returns a function that shards can use to communicate

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9b6662af/src/fabric_db_meta.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_meta.erl b/src/fabric_db_meta.erl
index 367ef06..4fd9365 100644
--- a/src/fabric_db_meta.erl
+++ b/src/fabric_db_meta.erl
@@ -12,7 +12,8 @@
 
 -module(fabric_db_meta).
 
--export([set_revs_limit/3, set_security/3, get_all_security/2]).
+-export([set_revs_limit/3, set_security/3, get_all_security/2,
+    set_purged_docs_limit/3]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -48,6 +49,29 @@ handle_revs_message(Error, _, _Acc) ->
     {error, Error}.
 
 
+set_purged_docs_limit(DbName, Limit, Options) ->
+    Shards = mem3:shards(DbName),
+    Workers = fabric_util:submit_jobs(Shards, set_purged_docs_limit, [Limit, Options]),
+    Handler = fun handle_purge_message/3,
+    Acc0 = {Workers, length(Workers) - 1},
+    case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of
+        {ok, ok} ->
+            ok;
+        {timeout, {DefunctWorkers, _}} ->
+            fabric_util:log_timeout(DefunctWorkers, "set_purged_docs_limit"),
+            {error, timeout};
+        Error ->
+            Error
+    end.
+
+handle_purge_message(ok, _, {_Workers, 0}) ->
+    {stop, ok};
+handle_purge_message(ok, Worker, {Workers, Waiting}) ->
+    {ok, {lists:delete(Worker, Workers), Waiting - 1}};
+handle_purge_message(Error, _, _Acc) ->
+    {error, Error}.
+
+
 set_security(DbName, SecObj, Options) ->
     Shards = mem3:shards(DbName),
     RexiMon = fabric_util:create_monitors(Shards),

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9b6662af/src/fabric_doc_open.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl
index 9c45bd9..f57e317 100644
--- a/src/fabric_doc_open.erl
+++ b/src/fabric_doc_open.erl
@@ -25,7 +25,8 @@
     r,
     state,
     replies,
-    q_reply
+    q_reply,
+    replies_by_node=[] %[{Node, Reply}] used for checking purged docs
 }).
 
 
@@ -83,7 +84,8 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
     end;
 handle_message(Reply, Worker, Acc) ->
     NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
-    NewAcc = Acc#acc{replies = NewReplies},
+    NewNReplies = [{Worker#shard.node, Reply}|Acc#acc.replies_by_node],
+    NewAcc = Acc#acc{replies = NewReplies, replies_by_node = NewNReplies},
     case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
     {true, QuorumReply} ->
         fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
@@ -122,14 +124,15 @@ is_r_met(Workers, Replies, R) ->
         no_more_workers
     end.
 
-read_repair(#acc{dbname=DbName, replies=Replies}) ->
+read_repair(#acc{dbname=DbName, replies=Replies, replies_by_node=NReplies0}) ->
     Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
+    NReplies = [{Node, Doc} || {Node, {ok, #doc{}=Doc}} <- NReplies0],
     case Docs of
     % omit local docs from read repair
     [#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
         choose_reply(Docs);
     [#doc{id=Id} | _] ->
-        Opts = [replicated_changes, ?ADMIN_CTX],
+        Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NReplies}],
         Res = fabric:update_docs(DbName, Docs, Opts),
         case Res of
             {ok, []} ->
@@ -319,15 +322,17 @@ handle_message_reply_test() ->
     ?assertEqual(
         {ok, Acc0#acc{
             workers=[Worker0, Worker1],
-            replies=[fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(foo,1)],
+            replies_by_node=[{undefined, foo}]
         }},
         handle_message(foo, Worker2, Acc0)
     ),
 
-    ?assertEqual(
+   ?assertEqual(
         {ok, Acc0#acc{
             workers=[Worker0, Worker1],
-            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
+            replies_by_node =[{undefined, bar}]
         }},
         handle_message(bar, Worker2, Acc0#acc{
             replies=[fabric_util:kv(foo,1)]
@@ -339,18 +344,24 @@ handle_message_reply_test() ->
     % is returned. Bit subtle on the assertions here.
 
     ?assertEqual(
-        {stop, Acc0#acc{workers=[],replies=[fabric_util:kv(foo,1)]}},
+        {stop, Acc0#acc{
+            workers=[],
+            replies=[fabric_util:kv(foo,1)],
+            replies_by_node=[{undefined, foo}]
+        }},
         handle_message(foo, Worker0, Acc0#acc{workers=[Worker0]})
     ),
 
     ?assertEqual(
         {stop, Acc0#acc{
             workers=[],
-            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
+            replies_by_node =[{undefined, bar}, {undefined, foo}]
         }},
         handle_message(bar, Worker0, Acc0#acc{
             workers=[Worker0],
-            replies=[fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(foo,1)],
+            replies_by_node=[{undefined, foo}]
         })
     ),
 
@@ -362,11 +373,13 @@ handle_message_reply_test() ->
             workers=[],
             replies=[fabric_util:kv(foo,2)],
             state=r_met,
-            q_reply=foo
+            q_reply=foo,
+            replies_by_node =[{undefined, foo}, {undefined, foo}]
         }},
         handle_message(foo, Worker1, Acc0#acc{
             workers=[Worker0, Worker1],
-            replies=[fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(foo,1)],
+            replies_by_node =[{undefined, foo}]
         })
     ),
 
@@ -376,7 +389,8 @@ handle_message_reply_test() ->
             r=1,
             replies=[fabric_util:kv(foo,1)],
             state=r_met,
-            q_reply=foo
+            q_reply=foo,
+            replies_by_node =[{undefined, foo}]
         }},
         handle_message(foo, Worker0, Acc0#acc{r=1})
     ),
@@ -386,11 +400,13 @@ handle_message_reply_test() ->
             workers=[],
             replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,2)],
             state=r_met,
-            q_reply=foo
+            q_reply=foo,
+            replies_by_node =[{undefined, foo}, {undefined, foo}, {undefined, bar}]
         }},
         handle_message(foo, Worker0, Acc0#acc{
             workers=[Worker0],
-            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+            replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
+            replies_by_node =[{undefined, foo}, {undefined, bar}]
         })
     ),
 

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9b6662af/src/fabric_doc_purge.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_purge.erl b/src/fabric_doc_purge.erl
new file mode 100644
index 0000000..58e7a5e
--- /dev/null
+++ b/src/fabric_doc_purge.erl
@@ -0,0 +1,424 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_purge).
+
+-export([go/3]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(_, [], _) ->
+    {ok, []};
+go(DbName, AllIdsRevs, Opts) ->
+    % tag each purge request with UUId
+    {AllUUIDs, AllUUIDsIdsRevs, DocCount} = tag_docs(AllIdsRevs),
+
+    Options = lists:delete(all_or_nothing, Opts),
+    % Counters -> [{Worker, UUIDs}]
+    {Counters, Workers} = dict:fold(fun(Shard, UUIDsIdsRevs, {Cs,Ws}) ->
+        UUIDs = [UUID || {UUID, _Id, _Revs} <-UUIDsIdsRevs],
+        #shard{name=Name, node=Node} = Shard,
+        Ref = rexi:cast(Node, {fabric_rpc, purge_docs, [Name, UUIDsIdsRevs, Options]}),
+        Worker = Shard#shard{ref=Ref},
+        { [{Worker, UUIDs}|Cs], [Worker|Ws]}
+    end, {[], []}, group_idrevs_by_shard(DbName, AllUUIDsIdsRevs)),
+
+    RexiMon = fabric_util:create_monitors(Workers),
+    W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
+    % ResultsAcc -> {PSeqsCs, DocsDict}
+    % PSeqsCs -> [{Shard, PurgeSeq}]
+    % DocsDict -> UUID -> [{ok, PurgedRevs}]
+    ResultsAcc = {[], dict:new()},
+    Acc = {length(Workers), DocCount, list_to_integer(W), Counters, ResultsAcc},
+    Timeout = fabric_util:request_timeout(),
+    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc, infinity, Timeout) of
+    {ok, {Health, PSeq, Results}} when Health =:= ok; Health =:= accepted ->
+        % Results-> [{UUID, {ok, Revs}}]
+        {Health, {PSeq, [R || R <-
+            couch_util:reorder_results(AllUUIDs, Results), R =/= noreply]}};
+    {timeout, Acc1} ->
+        {_, _, W1, Counters1, {PSeqsCs, DocsDict1}} = Acc1,
+        {DefunctWorkers, _} = lists:unzip(Counters1),
+        fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
+        {Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []}, DocsDict1),
+        FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
+        {Health, {FinalPSeq, [R || R <-
+            couch_util:reorder_results(AllUUIDs, Resp), R =/= noreply]}};
+    Else ->
+        Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) ->
+    {_, DocCount, W, Counters, {PSeqsCs, DocsDict0}} = Acc0,
+    {FailCounters, NewCounters} = lists:partition(fun({#shard{node=N}, _}) ->
+        N == NodeRef
+    end, Counters),
+    % fill DocsDict with error messages for relevant Docs
+    DocsDict = lists:foldl(fun({_W, Docs}, CDocsDict) ->
+        Replies = [{error, internal_server_error} || _D <- Docs],
+        append_update_replies(Docs, Replies, CDocsDict)
+    end, DocsDict0, FailCounters),
+    Results = {PSeqsCs, DocsDict},
+    skip_message({length(NewCounters), DocCount, W, NewCounters, Results});
+handle_message({rexi_EXIT, _}, Worker, Acc0) ->
+    {WC, DocCount , W, Counters, {PSeqsCs, DocsDict0}} = Acc0,
+    % fill DocsDict with error messages for relevant Docs
+    {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
+    Replies = [{error, internal_server_error} || _D <- Docs],
+    DocsDict = append_update_replies(Docs, Replies, DocsDict0),
+    skip_message({WC-1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}});
+handle_message({ok, {PSeq, Replies0}}, Worker, Acc0) ->
+    {WCount, DocCount, W, Counters, {PSeqsCs0, DocsDict0}} = Acc0,
+    {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
+    DocsDict = append_update_replies(Docs, Replies0, DocsDict0),
+    PSeqsCs = [{Worker, PSeq}| PSeqsCs0],
+    case {WCount, dict:size(DocsDict)} of
+    {1, _} ->
+        % last message has arrived, we need to conclude things
+        {Health, W, Replies} = dict:fold(fun force_reply/3, {ok, W, []},
+           DocsDict),
+        FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
+        {stop, {Health, FinalPSeq, Replies}};
+    {_, DocCount} ->
+        % we've got at least one reply for each document, let's take a look
+        case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocsDict) of
+        continue ->
+            {ok, {WCount - 1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}}};
+        {stop, W, Replies} ->
+            FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
+            {stop, {ok, FinalPSeq, Replies}}
+        end;
+    _ ->
+        {ok, {WCount - 1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}}}
+    end;
+handle_message({error, purged_docs_limit_exceeded}=Error, Worker, Acc0) ->
+    {WC, DocCount , W, Counters, {PSeqsCs, DocsDict0}} = Acc0,
+    % fill DocsDict with error messages for relevant Docs
+    {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
+    Replies = [Error || _D <- Docs],
+    DocsDict = append_update_replies(Docs, Replies, DocsDict0),
+    skip_message({WC-1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}});
+handle_message({bad_request, Msg}, _, _) ->
+    throw({bad_request, Msg}).
+
+
+tag_docs(AllIdsRevs) ->
+    lists:foldr(fun({Id, Revs}, {UUIDsAcc, UUIDsIdsRevsAcc, L}) ->
+        UUID = couch_uuids:new(),
+        NewUUIDsAcc = [UUID | UUIDsAcc],
+        NewUUIDsIdsRevsAcc = [{UUID, Id, Revs} | UUIDsIdsRevsAcc],
+        {NewUUIDsAcc, NewUUIDsIdsRevsAcc, L+1}
+    end, {[], [], 0}, AllIdsRevs).
+
+
+force_reply(Doc, Replies, {Health, W, Acc}) ->
+    case update_quorum_met(W, Replies) of
+    {true, FinalReply} ->
+        {Health, W, [{Doc, FinalReply} | Acc]};
+    false ->
+        case [Reply || {ok, Reply} <- Replies] of
+        [] ->
+            UReplies = lists:usort(Replies),
+            case UReplies of
+                [{error, internal_server_error}] ->
+                    {error, W, [{Doc, {error, internal_server_error}} | Acc]};
+                [FirstReply|[]] ->
+                    % check if all errors are identical, if so inherit health
+                    {Health, W, [{Doc, FirstReply} | Acc]};
+                _ ->
+                    {error, W, [{Doc, UReplies} | Acc]}
+             end;
+
+        AcceptedReplies0 ->
+            NewHealth = case Health of ok -> accepted; _ -> Health end,
+            AcceptedReplies = lists:usort(lists:flatten(AcceptedReplies0)),
+            {NewHealth, W, [{Doc, {accepted, AcceptedReplies}} | Acc]}
+        end
+    end.
+
+
+maybe_reply(_, _, continue) ->
+    % we didn't meet quorum for all docs, so we're fast-forwarding the fold
+    continue;
+maybe_reply(Doc, Replies, {stop, W, Acc}) ->
+    case update_quorum_met(W, Replies) of
+    {true, Reply} ->
+        {stop, W, [{Doc, Reply} | Acc]};
+    false ->
+        continue
+    end.
+
+update_quorum_met(W, Replies) ->
+    OkReplies = lists:foldl(fun(Reply, PrevsAcc) ->
+        case Reply of
+            {ok, PurgedRevs} -> [PurgedRevs | PrevsAcc];
+            _ -> PrevsAcc
+        end
+    end, [], Replies),
+    if length(OkReplies) < W -> false; true ->
+        % make a union of PurgedRevs
+        FinalReply = {ok, lists:usort(lists:flatten(OkReplies))},
+        {true, FinalReply}
+    end.
+
+
+group_idrevs_by_shard(DbName, UUIDsIdsRevs) ->
+    lists:foldl(fun({_UUID, Id, _Revs} = UUIDIdRevs, D0) ->
+        lists:foldl(fun(Shard, D1) ->
+            dict:append(Shard, UUIDIdRevs, D1)
+        end, D0, mem3:shards(DbName, Id))
+    end, dict:new(), UUIDsIdsRevs).
+
+
+append_update_replies([], [], DocReplyDict) ->
+    DocReplyDict;
+append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
+    append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
+
+
+skip_message({0, _, W, _, {PSeqsCs, DocsDict}}) ->
+    {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocsDict),
+    FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
+    {stop, {Health, FinalPSeq, Reply}};
+skip_message(Acc0) ->
+    {ok, Acc0}.
+
+
+% eunits
+doc_purge_ok_test() ->
+    meck:new(couch_log),
+    meck:expect(couch_log, warning, fun(_,_) -> ok end),
+    meck:expect(couch_log, notice, fun(_,_) -> ok end),
+
+    Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+    UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+    Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+    UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+    UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+    Shards =
+        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+    Counters = dict:to_list(
+        group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+    DocsDict = dict:new(),
+
+    % ***test for W = 2
+    AccW2 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+        Counters, {[], DocsDict}},
+    {ok, {WaitingCountW2_1,_,_,_,_} = AccW2_1} =
+        handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), AccW2),
+    ?assertEqual(2, WaitingCountW2_1),
+    {stop, FinalReplyW2 } =
+        handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}},
+            lists:nth(2,Shards), AccW2_1),
+    ?assertMatch(
+        {ok, _PSeq, Replies} when Replies ==
+            [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}],
+        FinalReplyW2
+    ),
+
+    % ***test for W = 3
+    AccW3 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+        Counters, {[], DocsDict}},
+    {ok, {WaitingCountW3_1,_,_,_,_} = AccW3_1} =
+        handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), AccW3),
+    ?assertEqual(2, WaitingCountW3_1),
+    {ok, {WaitingCountW3_2,_,_,_,_} = AccW3_2} =
+        handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}},
+            lists:nth(2,Shards), AccW3_1),
+    ?assertEqual(1, WaitingCountW3_2),
+    {stop, FinalReplyW3 } =
+        handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}},
+            lists:nth(3,Shards), AccW3_2),
+    ?assertMatch(
+        {ok, _PSeq, Replies} when Replies ==
+            [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}],
+        FinalReplyW3
+    ),
+
+    % *** test rexi_exit on 1 node
+    Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+        Counters, {[], DocsDict}},
+    {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+        handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), Acc0),
+    ?assertEqual(2, WaitingCount1),
+    {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+        handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+    ?assertEqual(1, WaitingCount2),
+    {stop, Reply} =
+        handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, lists:nth(3,Shards), Acc2),
+    ?assertMatch(
+        {ok, _PSeq, Replies} when Replies ==
+            [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}],
+        Reply
+    ),
+
+    % *** test {error, purge_during_compaction_exceeded_limit} on all nodes
+    % *** still should return ok reply for the request
+    ErrPDCEL = {error, purge_during_compaction_exceeded_limit},
+    Acc20 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+        Counters, {[], DocsDict}},
+    {ok, {WaitingCount21,_,_,_,_} = Acc21} =
+        handle_message({ok,{0,[ErrPDCEL, ErrPDCEL]}}, hd(Shards), Acc20),
+    ?assertEqual(2, WaitingCount21),
+    {ok, {WaitingCount22,_,_,_,_} = Acc22} =
+        handle_message({ok,{0,[ErrPDCEL, ErrPDCEL]}},
+            lists:nth(2,Shards), Acc21),
+    ?assertEqual(1, WaitingCount22),
+    {stop, Reply2 } =
+        handle_message({ok,{0,[ErrPDCEL, ErrPDCEL]}},
+            lists:nth(3,Shards), Acc22),
+    ?assertMatch(
+        {ok, _PSeq, Replies2} when Replies2 ==
+            [{UUID1, ErrPDCEL}, {UUID2, ErrPDCEL}],
+        Reply2
+    ),
+
+    % *** test {error, purged_docs_limit_exceeded} on all nodes
+    % *** still should return ok reply for the request
+    ErrPDLE = {error, purged_docs_limit_exceeded},
+    Acc30 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+        Counters, {[], DocsDict}},
+    {ok, {WaitingCount31,_,_,_,_} = Acc31} =
+        handle_message({ok,{0,[ErrPDLE, ErrPDLE]}}, hd(Shards), Acc30),
+    ?assertEqual(2, WaitingCount31),
+    {ok, {WaitingCount32,_,_,_,_} = Acc32} =
+        handle_message({ok,{0,[ErrPDLE, ErrPDLE]}},
+            lists:nth(2,Shards), Acc31),
+    ?assertEqual(1, WaitingCount32),
+    {stop, Reply3 } =
+        handle_message({ok,{0,[ErrPDLE, ErrPDLE]}},
+            lists:nth(3,Shards), Acc32),
+    ?assertMatch(
+        {ok, _PSeq, Replies3} when Replies3 ==
+            [{UUID1, ErrPDLE}, {UUID2, ErrPDLE}],
+        Reply3
+    ),
+    meck:unload(couch_log).
+
+
+doc_purge_accepted_test() ->
+    meck:new(couch_log),
+    meck:expect(couch_log, warning, fun(_,_) -> ok end),
+    meck:expect(couch_log, notice, fun(_,_) -> ok end),
+
+    Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+    UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+    Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+    UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+    UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+    Shards =
+        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+    Counters = dict:to_list(
+        group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+    DocsDict = dict:new(),
+
+    % *** test rexi_exit on 2 nodes
+    Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+        Counters, {[], DocsDict}},
+    {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+        handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), Acc0),
+    ?assertEqual(2, WaitingCount1),
+    {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+        handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+    ?assertEqual(1, WaitingCount2),
+    {stop, Reply} =
+        handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
+    ?assertMatch(
+        {accepted, _PSeq, Replies} when Replies ==
+            [{UUID1, {accepted, Revs1}}, {UUID2, {accepted, Revs2}}],
+        Reply
+    ),
+    meck:unload(couch_log).
+
+
+doc_purge_error_test() ->
+    meck:new(couch_log),
+    meck:expect(couch_log, warning, fun(_,_) -> ok end),
+    meck:expect(couch_log, notice, fun(_,_) -> ok end),
+
+    Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+    UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+    Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+    UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+    UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+    Shards =
+        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+    Counters = dict:to_list(
+        group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+    DocsDict = dict:new(),
+
+    % *** test rexi_exit on all 3 nodes
+    Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+        Counters, {[], DocsDict}},
+    {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+        handle_message({rexi_EXIT, nil}, hd(Shards), Acc0),
+    ?assertEqual(2, WaitingCount1),
+    {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+        handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+    ?assertEqual(1, WaitingCount2),
+    {stop, Reply} =
+        handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
+    ?assertMatch(
+        {error, _PSeq, Replies} when Replies ==
+            [{UUID1, {error, internal_server_error}},
+            {UUID2, {error, internal_server_error}}],
+        Reply
+    ),
+
+    % ***test w quorum > # shards, which should fail immediately
+    Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
+    Counters2 = dict:to_list(
+        group_idrevs_by_shard_hack(<<"foo">>, Shards2, UUIDsIDdsRevs)),
+    AccW4 = {length(Shards2), length(UUIDsIDdsRevs), list_to_integer("2"),
+        Counters2, {[], DocsDict}},
+    Bool =
+        case handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), AccW4) of
+            {stop, _Reply} ->
+                true;
+            _ -> false
+        end,
+    ?assertEqual(true, Bool),
+
+    % *** test Docs with no replies should end up as {error, internal_server_error}
+    SA1 = #shard{node=a, range=1},
+    SA2 = #shard{node=a, range=2},
+    SB1 = #shard{node=b, range=1},
+    SB2 = #shard{node=b, range=2},
+    Counters3 = [{SA1,[UUID1]}, {SB1,[UUID1]},
+        {SA2,[UUID2]}, {SB2,[UUID2]}],
+    Acc30 = {length(Counters3), length(UUIDsIDdsRevs), 2,
+        Counters3, {[], DocsDict}},
+    {ok, Acc31} = handle_message({ok,{1,[{ok, Revs1}]}}, SA1, Acc30),
+    {ok, Acc32} = handle_message({rexi_EXIT, nil}, SB1, Acc31),
+    {ok, Acc33} = handle_message({rexi_EXIT, nil}, SA2, Acc32),
+    {stop, Acc34} = handle_message({rexi_EXIT, nil}, SB2, Acc33),
+    ?assertMatch(
+        {error, _PSeq, Replies3} when Replies3 ==
+            [{UUID1, {accepted, Revs1}},
+            {UUID2, {error, internal_server_error}}],
+        Acc34
+    ),
+    meck:unload(couch_log).
+
+
+% needed for testing to avoid having to start the mem3 application
+group_idrevs_by_shard_hack(_DbName, Shards, UUIDsIdsRevs) ->
+    lists:foldl(fun({UUID, _Id, _Revs}, Dict0) ->
+        lists:foldl(fun(Shard, Dict1) ->
+            dict:append(Shard, UUID, Dict1)
+        end, Dict0, Shards)
+    end, dict:new(), UUIDsIdsRevs).
+

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9b6662af/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 6dee870..f7c079b 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -20,6 +20,7 @@
     set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]).
 -export([get_all_security/2, open_shard/2]).
 -export([compact/1, compact/2]).
+-export([get_purge_seq/2, purge_docs/3, set_purged_docs_limit/3]).
 
 -export([get_db_info/2, get_doc_count/2, get_update_seq/2,
          changes/4, map_view/5, reduce_view/5, group_info/3, update_mrview/4]).
@@ -197,6 +198,9 @@ get_all_security(DbName, Options) ->
 set_revs_limit(DbName, Limit, Options) ->
     with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
 
+set_purged_docs_limit(DbName, Limit, Options) ->
+    with_db(DbName, Options, {couch_db, set_purged_docs_limit, [Limit]}).
+
 open_doc(DbName, DocId, Options) ->
     with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
 
@@ -232,14 +236,33 @@ get_missing_revs(DbName, IdRevsList, Options) ->
     end).
 
 update_docs(DbName, Docs0, Options) ->
+    X = case proplists:get_value(replicated_changes, Options) of
+        true -> replicated_changes;
+        _ -> interactive_edit
+    end,
+    DocsByNode = couch_util:get_value(read_repair, Options),
+    case {X, DocsByNode} of
+        {_, undefined} ->
+            Docs = make_att_readers(Docs0),
+            with_db(DbName, Options,
+                {couch_db, update_docs, [Docs, Options, X]});
+        {replicated_changes, _} ->
+            update_docs_read_repair(DbName, DocsByNode, Options)
+    end.
+
+
+get_purge_seq(DbName, DbOptions) ->
+    with_db(DbName, DbOptions, {couch_db, get_purge_seq, []}).
+
+purge_docs(DbName, UUIdsIdsRevs, Options) ->
     case proplists:get_value(replicated_changes, Options) of
-    true ->
-        X = replicated_changes;
-    _ ->
-        X = interactive_edit
+        true ->
+            X = replicated_changes;
+        _ ->
+            X = interactive_edit
     end,
-    Docs = make_att_readers(Docs0),
-    with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
+    with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, X]}).
+
 
 %% @equiv group_info(DbName, DDocId, [])
 group_info(DbName, DDocId) ->
@@ -294,6 +317,74 @@ with_db(DbName, Options, {M,F,A}) ->
         rexi:reply(Error)
     end.
 
+
+update_docs_read_repair(DbName, DocsByNode, Options) ->
+    set_io_priority(DbName, Options),
+    case get_or_create_db(DbName, Options) of
+    {ok, Db} ->
+        % omit Revisions that have been purged
+        Docs = filter_purged_revs(Db, DocsByNode),
+        Docs2 = make_att_readers(Docs),
+        {M,F,A} = {couch_db, update_docs, [Docs2, Options, replicated_changes]},
+        rexi:reply(try
+            apply(M, F, [Db | A])
+        catch Exception ->
+            Exception;
+        error:Reason ->
+            couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason,
+                clean_stack()]),
+            {error, Reason}
+        end);
+    Error ->
+        rexi:reply(Error)
+    end.
+
+
+% given [{Node, Doc}] diff revs of the same DocID from diff nodes
+% returns [Doc] filtering out purged docs.
+% This is done for read-repair from fabric_doc_open,
+% so that not to recreate Docs that have been purged before
+% on this node() from Nodes that are out of sync.
+filter_purged_revs(Db, DocsByNode) ->
+    AllowedPSeqLag = config:get_integer("couchdb",
+        "allowed_purge_seq_lag", 100),
+    DbPSeq = couch_db:get_purge_seq(Db),
+    PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) ->  [{Id, Revs}|Acc]  end,
+    Opts = [{start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-mem3-")},
+            {end_key_gt, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-mem31")}],
+    % go through _local/purge-mem3-.. docs
+    % find Node that this LDoc corresponds to
+    % check if the update from Node has not been recently purged on current node
+    LDocsFoldFun = fun(#doc{body={Props}}, Acc) ->
+        {VOps} = couch_util:get_value(<<"verify_options">>, Props),
+        Node = couch_util:get_value(<<"node">>, VOps),
+        Result = lists:keyfind(Node, 1, DocsByNode),
+        NewAcc = if not Result -> Acc; true ->
+            {Node, Doc} = Result,
+            NodePSeq = couch_util:get_value(<<"purge_seq">>, Props),
+            if  NodePSeq == DbPSeq ->
+                    [Doc|Acc];
+                (NodePSeq+AllowedPSeqLag) < DbPSeq ->
+                    % Node is very out of sync, ignore updates from it
+                    Acc;
+                true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq
+                    % if Doc has been purged recently, than ignore it
+                    {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db,
+                            NodePSeq, PurgeFoldFun, [], []),
+                    {Start, [FirstRevId|_]} = Doc#doc.revs,
+                    DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]},
+                    case lists:member(DocIdRevs, PurgedIdsRevs) of
+                        true -> Acc;
+                        false -> [Doc|Acc]
+                    end
+            end
+        end,
+        {ok, NewAcc}
+    end,
+    {ok, Docs} = couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts),
+    Docs.
+
+
 get_or_create_db(DbName, Options) ->
     couch_db:open_int(DbName, [{create_if_missing, true} | Options]).