You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ji...@apache.org on 2018/07/10 06:38:13 UTC

[couchdb] branch COUCHDB-3326-clustered-purge-pr5-implementation updated (2636973 -> a6c269e)

This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a change to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


    omit 2636973  [10/10] Clustered Purge: Clustered HTTP API
    omit bbf8bdf  [09/10] Clustered Purge: Fabric API
    omit df36be6  [08/10] Clustered Purge: Update read-repair
    omit d5d9219  [07/10] Clustered Purge: Internal replication
    omit 432e717  [06/10] Clustered Purge: Update mrview indexes
    omit 269132e  [05/10] Clustered Purge: Add upgrade tests
     new f291730  [05/10] Clustered Purge: Add upgrade tests
     new 0cc7757  [06/10] Clustered Purge: Update mrview indexes
     new d8e2f8c  [07/10] Clustered Purge: Internal replication
     new a72f60e  [08/10] Clustered Purge: Update read-repair
     new f3fddcc  [09/10] Clustered Purge: Fabric API
     new a6c269e  [10/10] Clustered Purge: Clustered HTTP API

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (2636973)
            \
             N -- N -- N   refs/heads/COUCHDB-3326-clustered-purge-pr5-implementation (a6c269e)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch_pse_tests/src/cpse_test_purge_docs.erl   | 92 +++++++++++-----------
 .../src/cpse_test_purge_replication.erl            | 20 ++---
 src/couch_pse_tests/src/cpse_test_purge_seqs.erl   | 28 +++----
 src/couch_pse_tests/src/cpse_util.erl              |  6 +-
 4 files changed, 73 insertions(+), 73 deletions(-)


[couchdb] 03/06: [07/10] Clustered Purge: Internal replication

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit d8e2f8c88f772b3be1a6b3117795c9aa83626318
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Apr 24 12:27:14 2018 -0500

    [07/10] Clustered Purge: Internal replication
    
    This commit implements the internal replication of purge requests. This
    part of the anit-entropy process is important for ensuring that shard
    copies continue to be eventually consistent even if updates happen to
    shards independently due to a network split or other event that prevents
    the successful purge request to a given copy.
    
    The main addition to internal replication is that we both pull and push
    purge requests between the source and target shards. The push direction
    is obvious given that internal replication is in the push direction
    already. Pull isn't quite as obvious but is required so that we don't
    push an update that was already purged on the target.
    
    Of note is that internal replication also has to maintain _local doc
    checkpoints to prevent compaction removing old purge requests or else
    shard copies could end up missing purge requests which would prevent the
    shard copies from ever reaching a consistent state.
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <ma...@ca.ibm.com>
    Co-authored-by: jiangphcn <ji...@cn.ibm.com>
---
 .../src/cpse_test_purge_replication.erl            | 202 ++++++++++++++++++++
 src/couch_pse_tests/src/cpse_util.erl              |  32 +++-
 src/mem3/src/mem3_epi.erl                          |   3 +-
 .../src/{mem3_epi.erl => mem3_plugin_couch_db.erl} |  39 +---
 src/mem3/src/mem3_rep.erl                          | 206 +++++++++++++++++++--
 src/mem3/src/mem3_rpc.erl                          |  71 ++++++-
 6 files changed, 499 insertions(+), 54 deletions(-)

diff --git a/src/couch_pse_tests/src/cpse_test_purge_replication.erl b/src/couch_pse_tests/src/cpse_test_purge_replication.erl
new file mode 100644
index 0000000..fb09eeb
--- /dev/null
+++ b/src/couch_pse_tests/src/cpse_test_purge_replication.erl
@@ -0,0 +1,202 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(cpse_test_purge_replication).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+setup_all() ->
+    cpse_util:setup_all([mem3, fabric, couch_replicator]).
+
+
+setup_each() ->
+    {ok, Src} = cpse_util:create_db(),
+    {ok, Tgt} = cpse_util:create_db(),
+    {couch_db:name(Src), couch_db:name(Tgt)}.
+
+
+teardown_each({SrcDb, TgtDb}) ->
+    ok = couch_server:delete(SrcDb, []),
+    ok = couch_server:delete(TgtDb, []).
+
+
+cpse_purge_http_replication({Source, Target}) ->
+    {ok, Rev1} = cpse_util:save_doc(Source, {[{'_id', foo}, {vsn, 1}]}),
+
+    cpse_util:assert_db_props(?MODULE, ?LINE, Source, [
+        {doc_count, 1},
+        {del_doc_count, 0},
+        {update_seq, 1},
+        {changes, 1},
+        {purge_seq, 0},
+        {purge_infos, []}
+    ]),
+
+    RepObject = {[
+        {<<"source">>, Source},
+        {<<"target">>, Target}
+    ]},
+
+    {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+    {ok, Doc1} = cpse_util:open_doc(Target, foo),
+
+    cpse_util:assert_db_props(?MODULE, ?LINE, Target, [
+        {doc_count, 1},
+        {del_doc_count, 0},
+        {update_seq, 1},
+        {changes, 1},
+        {purge_seq, 0},
+        {purge_infos, []}
+    ]),
+
+    PurgeInfos = [
+        {cpse_util:uuid(), <<"foo">>, [Rev1]}
+    ],
+
+    {ok, [{ok, PRevs}]} = cpse_util:purge(Source, PurgeInfos),
+    ?assertEqual([Rev1], PRevs),
+
+    cpse_util:assert_db_props(?MODULE, ?LINE, Source, [
+        {doc_count, 0},
+        {del_doc_count, 0},
+        {update_seq, 2},
+        {changes, 0},
+        {purge_seq, 1},
+        {purge_infos, PurgeInfos}
+    ]),
+
+    % Show that a purge on the source is
+    % not replicated to the target
+    {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+    {ok, Doc2} = cpse_util:open_doc(Target, foo),
+    [Rev2] = Doc2#doc_info.revs,
+    ?assertEqual(Rev1, Rev2#rev_info.rev),
+    ?assertEqual(Doc1, Doc2),
+
+    cpse_util:assert_db_props(?MODULE, ?LINE, Target, [
+        {doc_count, 1},
+        {del_doc_count, 0},
+        {update_seq, 1},
+        {changes, 1},
+        {purge_seq, 0},
+        {purge_infos, []}
+    ]),
+
+    % Show that replicating from the target
+    % back to the source reintroduces the doc
+    RepObject2 = {[
+        {<<"source">>, Target},
+        {<<"target">>, Source}
+    ]},
+
+    {ok, _} = couch_replicator:replicate(RepObject2, ?ADMIN_USER),
+    {ok, Doc3} = cpse_util:open_doc(Source, foo),
+    [Revs3] = Doc3#doc_info.revs,
+    ?assertEqual(Rev1, Revs3#rev_info.rev),
+
+    cpse_util:assert_db_props(?MODULE, ?LINE, Source, [
+        {doc_count, 1},
+        {del_doc_count, 0},
+        {update_seq, 3},
+        {changes, 1},
+        {purge_seq, 1},
+        {purge_infos, PurgeInfos}
+    ]).
+
+
+cpse_purge_internal_repl_disabled({Source, Target}) ->
+    cpse_util:with_config([{"mem3", "replicate_purges", "false"}], fun() ->
+        repl(Source, Target),
+
+        {ok, [Rev1, Rev2]} = cpse_util:save_docs(Source, [
+            {[{'_id', foo1}, {vsn, 1}]},
+            {[{'_id', foo2}, {vsn, 2}]}
+        ]),
+
+        repl(Source, Target),
+
+        PurgeInfos1 = [
+            {cpse_util:uuid(), <<"foo1">>, [Rev1]}
+        ],
+        {ok, [{ok, PRevs1}]} = cpse_util:purge(Source, PurgeInfos1),
+        ?assertEqual([Rev1], PRevs1),
+
+        PurgeInfos2 = [
+            {cpse_util:uuid(), <<"foo2">>, [Rev2]}
+        ],
+        {ok, [{ok, PRevs2}]} = cpse_util:purge(Target, PurgeInfos2),
+        ?assertEqual([Rev2], PRevs2),
+
+        SrcShard = make_shard(Source),
+        TgtShard = make_shard(Target),
+        ?assertEqual({ok, 0}, mem3_rep:go(SrcShard, TgtShard)),
+        ?assertEqual({ok, 0}, mem3_rep:go(TgtShard, SrcShard)),
+
+        ?assertMatch({ok, #doc_info{}}, cpse_util:open_doc(Source, <<"foo2">>)),
+        ?assertMatch({ok, #doc_info{}}, cpse_util:open_doc(Target, <<"foo1">>))
+    end).
+
+
+cpse_purge_repl_simple_pull({Source, Target}) ->
+    repl(Source, Target),
+
+    {ok, Rev} = cpse_util:save_doc(Source, {[{'_id', foo}, {vsn, 1}]}),
+    repl(Source, Target),
+
+    PurgeInfos = [
+        {cpse_util:uuid(), <<"foo">>, [Rev]}
+    ],
+    {ok, [{ok, PRevs}]} = cpse_util:purge(Target, PurgeInfos),
+    ?assertEqual([Rev], PRevs),
+    repl(Source, Target).
+
+
+cpse_purge_repl_simple_push({Source, Target}) ->
+    repl(Source, Target),
+
+    {ok, Rev} = cpse_util:save_doc(Source, {[{'_id', foo}, {vsn, 1}]}),
+    repl(Source, Target),
+
+    PurgeInfos = [
+        {cpse_util:uuid(), <<"foo">>, [Rev]}
+    ],
+    {ok, [{ok, PRevs}]} = cpse_util:purge(Source, PurgeInfos),
+    ?assertEqual([Rev], PRevs),
+    repl(Source, Target).
+
+
+repl(Source, Target) ->
+    SrcShard = make_shard(Source),
+    TgtShard = make_shard(Target),
+
+    ?assertEqual({ok, 0}, mem3_rep:go(SrcShard, TgtShard)),
+
+    SrcTerm = cpse_util:db_as_term(Source, replication),
+    TgtTerm = cpse_util:db_as_term(Target, replication),
+
+    Diff = cpse_util:term_diff(SrcTerm, TgtTerm),
+    ?assertEqual(nodiff, Diff).
+
+
+make_shard(DbName) ->
+    #shard{
+        name = DbName,
+        node = node(),
+        dbname = DbName,
+        range = [0, 16#FFFFFFFF]
+    }.
diff --git a/src/couch_pse_tests/src/cpse_util.erl b/src/couch_pse_tests/src/cpse_util.erl
index 9dae9a0..e042520 100644
--- a/src/couch_pse_tests/src/cpse_util.erl
+++ b/src/couch_pse_tests/src/cpse_util.erl
@@ -62,6 +62,7 @@ setup_all(ExtraApps) ->
     EngineModStr = atom_to_list(EngineMod),
     config:set("couchdb_engines", Extension, EngineModStr, false),
     config:set("log", "include_sasl", "false", false),
+    config:set("mem3", "replicate_purges", "true", false),
     Ctx.
 
 
@@ -428,17 +429,25 @@ prev_rev(#full_doc_info{} = FDI) ->
 
 
 db_as_term(Db) ->
+    db_as_term(Db, compact).
+
+db_as_term(DbName, Type) when is_binary(DbName) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        db_as_term(Db, Type)
+    end);
+
+db_as_term(Db, Type) ->
     [
-        {props, db_props_as_term(Db)},
+        {props, db_props_as_term(Db, Type)},
         {docs, db_docs_as_term(Db)},
-        {local_docs, db_local_docs_as_term(Db)},
+        {local_docs, db_local_docs_as_term(Db, Type)},
         {changes, db_changes_as_term(Db)},
         {purged_docs, db_purged_docs_as_term(Db)}
     ].
 
 
-db_props_as_term(Db) ->
-    Props = [
+db_props_as_term(Db, Type) ->
+    Props0 = [
         get_doc_count,
         get_del_doc_count,
         get_disk_version,
@@ -450,6 +459,9 @@ db_props_as_term(Db) ->
         get_uuid,
         get_epochs
     ],
+    Props = if Type /= replication -> Props0; true ->
+        Props0 -- [get_uuid]
+    end,
     lists:map(fun(Fun) ->
         {Fun, couch_db_engine:Fun(Db)}
     end, Props).
@@ -463,8 +475,16 @@ db_docs_as_term(Db) ->
     end, FDIs)).
 
 
-db_local_docs_as_term(Db) ->
-    FoldFun = fun(Doc, Acc) -> {ok, [Doc | Acc]} end,
+db_local_docs_as_term(Db, Type) ->
+    FoldFun = fun(Doc, Acc) ->
+        case Doc#doc.id of
+            <<?LOCAL_DOC_PREFIX, "purge-mem3", _/binary>>
+                when Type == replication ->
+                {ok, Acc};
+            _ ->
+                {ok, [Doc | Acc]}
+        end
+    end,
     {ok, LDocs} = couch_db:fold_local_docs(Db, FoldFun, [], []),
     lists:reverse(LDocs).
 
diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_epi.erl
index ebcd596..4bf2bf5 100644
--- a/src/mem3/src/mem3_epi.erl
+++ b/src/mem3/src/mem3_epi.erl
@@ -30,7 +30,8 @@ app() ->
 
 providers() ->
     [
-         {chttpd_handlers, mem3_httpd_handlers}
+        {couch_db, mem3_plugin_couch_db},
+        {chttpd_handlers, mem3_httpd_handlers}
     ].
 
 
diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_plugin_couch_db.erl
similarity index 52%
copy from src/mem3/src/mem3_epi.erl
copy to src/mem3/src/mem3_plugin_couch_db.erl
index ebcd596..8cb5d78 100644
--- a/src/mem3/src/mem3_epi.erl
+++ b/src/mem3/src/mem3_plugin_couch_db.erl
@@ -2,7 +2,7 @@
 % use this file except in compliance with the License. You may obtain a copy of
 % the License at
 %
-% http://www.apache.org/licenses/LICENSE-2.0
+%   http://www.apache.org/licenses/LICENSE-2.0
 %
 % Unless required by applicable law or agreed to in writing, software
 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
@@ -10,41 +10,12 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
-
--module(mem3_epi).
-
--behaviour(couch_epi_plugin).
+-module(mem3_plugin_couch_db).
 
 -export([
-    app/0,
-    providers/0,
-    services/0,
-    data_subscriptions/0,
-    data_providers/0,
-    processes/0,
-    notify/3
+    is_valid_purge_client/2
 ]).
 
-app() ->
-    mem3.
-
-providers() ->
-    [
-         {chttpd_handlers, mem3_httpd_handlers}
-    ].
-
-
-services() ->
-    [].
-
-data_subscriptions() ->
-    [].
-
-data_providers() ->
-    [].
-
-processes() ->
-    [].
 
-notify(_Key, _Old, _New) ->
-    ok.
+is_valid_purge_client(DbName, Props) ->
+    mem3_rep:verify_purge_checkpoint(DbName, Props).
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index bbb0627..5a74019 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -17,6 +17,8 @@
     go/2,
     go/3,
     make_local_id/2,
+    make_purge_id/2,
+    verify_purge_checkpoint/2,
     find_source_seq/4
 ]).
 
@@ -35,6 +37,7 @@
     infos = [],
     seq = 0,
     localid,
+    purgeid,
     source,
     target,
     filter,
@@ -118,6 +121,40 @@ make_local_id(SourceThing, TargetThing, Filter) ->
     <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
 
 
+make_purge_id(SourceUUID, TargetUUID) ->
+    <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
+
+
+verify_purge_checkpoint(DbName, Props) ->
+    try
+        Type = couch_util:get_value(<<"type">>, Props),
+        if Type =/= <<"internal_replication">> -> false; true ->
+            SourceBin = couch_util:get_value(<<"source">>, Props),
+            TargetBin = couch_util:get_value(<<"target">>, Props),
+            Range = couch_util:get_value(<<"range">>, Props),
+
+            Source = binary_to_existing_atom(SourceBin, latin1),
+            Target = binary_to_existing_atom(TargetBin, latin1),
+
+            try
+                Shards = mem3:shards(DbName),
+                Nodes = lists:foldl(fun(Shard, Acc) ->
+                    case Shard#shard.range == Range of
+                        true -> [Shard#shard.node | Acc];
+                        false -> Acc
+                    end
+                end, [], mem3:shards(DbName)),
+                lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
+            catch
+                error:database_does_not_exist ->
+                    false
+            end
+        end
+    catch _:_ ->
+        false
+    end.
+
+
 %% @doc Find and return the largest update_seq in SourceDb
 %% that the client has seen from TargetNode.
 %%
@@ -169,20 +206,132 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
     end.
 
 
-repl(#acc{db = Db} = Acc0) ->
-    erlang:put(io_priority, {internal_repl, couch_db:name(Db)}),
-    #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0),
-    case Seq >= couch_db:get_update_seq(Db) of
-        true ->
-            {ok, 0};
-        false ->
-            Fun = fun ?MODULE:changes_enumerator/2,
-            {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
-            {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
-            {ok, couch_db:count_changes_since(Db, LastSeq)}
+repl(#acc{db = Db0} = Acc0) ->
+    erlang:put(io_priority, {internal_repl, couch_db:name(Db0)}),
+    Acc1 = calculate_start_seq(Acc0),
+    try
+        Acc3 = case config:get_boolean("mem3", "replicate_purges", false) of
+            true ->
+                Acc2 = pull_purges(Acc1),
+                push_purges(Acc2);
+            false ->
+                Acc1
+        end,
+        push_changes(Acc3)
+    catch
+        throw:{finished, Count} ->
+            {ok, Count}
     end.
 
 
+pull_purges(#acc{} = Acc0) ->
+    #acc{
+        batch_size = Count,
+        seq = UpdateSeq,
+        target = Target
+    } = Acc0,
+    #shard{
+        node = TgtNode,
+        name = TgtDbName
+    } = Target,
+
+    with_src_db(Acc0, fun(Db) ->
+        SrcUUID = couch_db:get_uuid(Db),
+        {LocalPurgeId, Infos, ThroughSeq, Remaining} =
+                mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count),
+
+        if Infos == [] -> ok; true ->
+            {ok, _} = couch_db:purge_docs(Db, Infos, [replicated_edits]),
+            Body = purge_cp_body(Acc0, ThroughSeq),
+            mem3_rpc:save_purge_checkpoint(
+                    TgtNode, TgtDbName, LocalPurgeId, Body)
+        end,
+
+        if Remaining =< 0 -> ok; true ->
+            PurgeSeq = couch_db:get_purge_seq(Db),
+            OldestPurgeSeq = couch_db:get_oldest_purge_seq(Db),
+            PurgesToPush = PurgeSeq - OldestPurgeSeq,
+            Changes = couch_db:count_changes_since(Db, UpdateSeq),
+            throw({finished, Remaining + PurgesToPush + Changes})
+        end,
+
+        Acc0#acc{purgeid = LocalPurgeId}
+    end).
+
+
+push_purges(#acc{} = Acc0) ->
+    #acc{
+        batch_size = BatchSize,
+        purgeid = LocalPurgeId,
+        seq = UpdateSeq,
+        target = Target
+    } = Acc0,
+    #shard{
+        node = TgtNode,
+        name = TgtDbName
+    } = Target,
+
+    with_src_db(Acc0, fun(Db) ->
+        StartSeq = case couch_db:open_doc(Db, LocalPurgeId, []) of
+            {ok, #doc{body = {Props}}} ->
+                couch_util:get_value(<<"purge_seq">>, Props);
+            {not_found, _} ->
+                Oldest = couch_db:get_oldest_purge_seq(Db),
+                erlang:max(0, Oldest - 1)
+        end,
+
+        FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
+            NewCount = Count + length(Revs),
+            NewInfos = [{UUID, Id, Revs} | Infos],
+            Status = if NewCount < BatchSize -> ok; true -> stop end,
+            {Status, {NewCount, NewInfos, PSeq}}
+        end,
+        InitAcc = {0, [], StartSeq},
+        {ok, {_, Infos, ThroughSeq}} =
+            couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
+
+        if Infos == [] -> ok; true ->
+            ok = purge_on_target(TgtNode, TgtDbName, Infos),
+            Doc = #doc{
+                id = LocalPurgeId,
+                body = purge_cp_body(Acc0, ThroughSeq)
+            },
+            {ok, _} = couch_db:update_doc(Db, Doc, [])
+        end,
+
+        PurgeSeq = couch_db:get_purge_seq(Db),
+        if ThroughSeq >= PurgeSeq -> ok; true ->
+            Remaining = PurgeSeq - ThroughSeq,
+            Changes = couch_db:count_changes_since(Db, UpdateSeq),
+            throw({finished, Remaining + Changes})
+        end,
+
+        Acc0
+    end).
+
+
+push_changes(#acc{} = Acc0) ->
+    #acc{
+        db = Db0,
+        seq = Seq
+    } = Acc0,
+
+    % Avoid needless rewriting the internal replication
+    % checkpoint document if nothing is replicated.
+    UpdateSeq = couch_db:get_update_seq(Db0),
+    if Seq < UpdateSeq -> ok; true ->
+        throw({finished, 0})
+    end,
+
+    with_src_db(Acc0, fun(Db) ->
+        Acc1 = Acc0#acc{db = Db},
+        Fun = fun ?MODULE:changes_enumerator/2,
+        {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
+        {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
+        {ok, couch_db:count_changes_since(Db, LastSeq)}
+    end).
+
+
 calculate_start_seq(Acc) ->
     #acc{
         db = Db,
@@ -323,6 +472,15 @@ save_on_target(Node, Name, Docs) ->
     ok.
 
 
+purge_on_target(Node, Name, PurgeInfos) ->
+    mem3_rpc:purge_docs(Node, Name, PurgeInfos, [
+        replicated_changes,
+        full_commit,
+        ?ADMIN_CTX,
+        {io_priority, {internal_repl, Name}}
+    ]),
+    ok.
+
 update_locals(Acc) ->
     #acc{seq=Seq, db=Db, target=Target, localid=Id, history=History} = Acc,
     #shard{name=Name, node=Node} = Target,
@@ -336,6 +494,23 @@ update_locals(Acc) ->
     {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
 
 
+purge_cp_body(#acc{} = Acc, PurgeSeq) ->
+    #acc{
+        source = Source,
+        target = Target
+    } = Acc,
+    {Mega, Secs, _} = os:timestamp(),
+    NowSecs = Mega * 1000000 + Secs,
+    {[
+        {<<"type">>, <<"internal_replication">>},
+        {<<"updated_on">>, NowSecs},
+        {<<"purge_seq">>, PurgeSeq},
+        {<<"source">>, atom_to_binary(Source#shard.node, latin1)},
+        {<<"target">>, atom_to_binary(Target#shard.node, latin1)},
+        {<<"range">>, Source#shard.range}
+    ]}.
+
+
 find_repl_doc(SrcDb, TgtUUIDPrefix) ->
     SrcUUID = couch_db:get_uuid(SrcDb),
     S = couch_util:encodeBase64Url(crypto:hash(md5, term_to_binary(SrcUUID))),
@@ -366,6 +541,15 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
     end.
 
 
+with_src_db(#acc{source = Source}, Fun) ->
+    {ok, Db} = couch_db:open(Source#shard.name, [?ADMIN_CTX]),
+    try
+        Fun(Db)
+    after
+        couch_db:close(Db)
+    end.
+
+
 is_prefix(Prefix, Subject) ->
     binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
 
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index c2bd58f..35d1d0a 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -20,14 +20,21 @@
     get_missing_revs/4,
     update_docs/4,
     load_checkpoint/4,
-    save_checkpoint/6
+    save_checkpoint/6,
+
+    load_purge_infos/4,
+    save_purge_checkpoint/4,
+    purge_docs/4
 ]).
 
 % Private RPC callbacks
 -export([
     find_common_seq_rpc/3,
     load_checkpoint_rpc/3,
-    save_checkpoint_rpc/5
+    save_checkpoint_rpc/5,
+
+    load_purge_infos_rpc/3,
+    save_purge_checkpoint_rpc/3
 ]).
 
 
@@ -58,6 +65,20 @@ find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
     rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
 
 
+load_purge_infos(Node, DbName, SourceUUID, Count) ->
+    Args = [DbName, SourceUUID, Count],
+    rexi_call(Node, {mem3_rpc, load_purge_infos_rpc, Args}).
+
+
+save_purge_checkpoint(Node, DbName, PurgeDocId, Body) ->
+    Args = [DbName, PurgeDocId, Body],
+    rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}).
+
+
+purge_docs(Node, DbName, PurgeInfos, Options) ->
+    rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}).
+
+
 load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     case get_or_create_db(DbName, [?ADMIN_CTX]) of
@@ -128,6 +149,52 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
     end.
 
 
+load_purge_infos_rpc(DbName, SrcUUID, BatchSize) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            TgtUUID = couch_db:get_uuid(Db),
+            PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
+            StartSeq = case couch_db:open_doc(Db, PurgeDocId, []) of
+                {ok, #doc{body = {Props}}} ->
+                    couch_util:get_value(<<"purge_seq">>, Props);
+                {not_found, _} ->
+                    Oldest = couch_db:get_oldest_purge_seq(Db),
+                    erlang:max(0, Oldest - 1)
+            end,
+            FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
+                NewCount = Count + length(Revs),
+                NewInfos = [{UUID, Id, Revs} | Infos],
+                Status = if NewCount < BatchSize -> ok; true -> stop end,
+                {Status, {NewCount, NewInfos, PSeq}}
+            end,
+            InitAcc = {0, [], StartSeq},
+            {ok, {_, PurgeInfos, ThroughSeq}} =
+                    couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
+            PurgeSeq = couch_db:get_purge_seq(Db),
+            Remaining = PurgeSeq - ThroughSeq,
+            rexi:reply({ok, {PurgeDocId, PurgeInfos, ThroughSeq, Remaining}});
+        Else ->
+            rexi:reply(Else)
+    end.
+
+
+save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case get_or_create_db(DbName, [?ADMIN_CTX]) of
+        {ok, Db} ->
+            Doc = #doc{id = PurgeDocId, body = Body},
+            Resp = try couch_db:update_doc(Db, Doc, []) of
+                Resp0 -> Resp0
+            catch T:R ->
+                {T, R}
+            end,
+            rexi:reply(Resp);
+        Error ->
+            rexi:reply(Error)
+    end.
+
+
 %% @doc Return the sequence where two files with the same UUID diverged.
 compare_epochs(SourceEpochs, TargetEpochs) ->
     compare_rev_epochs(


[couchdb] 05/06: [09/10] Clustered Purge: Fabric API

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f3fddccc2fa49ebe0cd53153507e1cd7dc23e7c2
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Apr 24 12:27:43 2018 -0500

    [09/10] Clustered Purge: Fabric API
    
    This commit implements the clustered API for performing purge requests.
    This change should be a fairly straightforward change for anyone already
    familiar with the general implementation of a fabric coordinator given
    that the purge API is fairly simple.
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <ma...@ca.ibm.com>
    Co-authored-by: jiangphcn <ji...@cn.ibm.com>
---
 src/fabric/src/fabric.erl           |  27 +-
 src/fabric/src/fabric_db_info.erl   |  29 +-
 src/fabric/src/fabric_db_meta.erl   |  26 +-
 src/fabric/src/fabric_doc_purge.erl | 572 ++++++++++++++++++++++++++++++++++++
 4 files changed, 638 insertions(+), 16 deletions(-)

diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 4a07271..b2e71dc 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -21,12 +21,13 @@
     delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
     set_security/2, set_security/3, get_revs_limit/1, get_security/1,
     get_security/2, get_all_security/1, get_all_security/2,
+    get_purge_infos_limit/1, set_purge_infos_limit/3,
     compact/1, compact/2]).
 
 % Documents
 -export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3,
     get_missing_revs/2, get_missing_revs/3, update_doc/3, update_docs/3,
-    purge_docs/2, att_receiver/2]).
+    purge_docs/3, att_receiver/2]).
 
 % Views
 -export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4,
@@ -137,6 +138,18 @@ set_security(DbName, SecObj) ->
 set_security(DbName, SecObj, Options) ->
     fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)).
 
+%% @doc sets the upper bound for the number of stored purge requests
+-spec set_purge_infos_limit(dbname(), pos_integer(), [option()]) -> ok.
+set_purge_infos_limit(DbName, Limit, Options)
+        when is_integer(Limit), Limit > 0 ->
+    fabric_db_meta:set_purge_infos_limit(dbname(DbName), Limit, opts(Options)).
+
+%% @doc retrieves the upper bound for the number of stored purge requests
+-spec get_purge_infos_limit(dbname()) -> pos_integer() | no_return().
+get_purge_infos_limit(DbName) ->
+    {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
+    try couch_db:get_purge_infos_limit(Db) after catch couch_db:close(Db) end.
+
 get_security(DbName) ->
     get_security(DbName, [?ADMIN_CTX]).
 
@@ -267,8 +280,16 @@ update_docs(DbName, Docs, Options) ->
         {aborted, PreCommitFailures}
     end.
 
-purge_docs(_DbName, _IdsRevs) ->
-    not_implemented.
+
+%% @doc purge revisions for a list '{Id, Revs}'
+%%      returns {ok, {PurgeSeq, Results}}
+-spec purge_docs(dbname(), [{docid(), [revision()]}], [option()]) ->
+    {ok, [{Health, [revision()]}] | {error, any()}} when
+    Health :: ok | accepted.
+purge_docs(DbName, IdsRevs, Options) when is_list(IdsRevs) ->
+    IdsRevs2 = [idrevs(IdRs) || IdRs <- IdsRevs],
+    fabric_doc_purge:go(dbname(DbName), IdsRevs2, opts(Options)).
+
 
 %% @doc spawns a process to upload attachment data and
 %%      returns a function that shards can use to communicate
diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl
index 98e8e52..97a31c2 100644
--- a/src/fabric/src/fabric_db_info.erl
+++ b/src/fabric/src/fabric_db_info.erl
@@ -23,10 +23,12 @@ go(DbName) ->
     RexiMon = fabric_util:create_monitors(Shards),
     Fun = fun handle_message/3,
     {ok, ClusterInfo} = get_cluster_info(Shards),
-    Acc0 = {fabric_dict:init(Workers, nil), [{cluster, ClusterInfo}]},
+    Acc0 = {fabric_dict:init(Workers, nil), [], [{cluster, ClusterInfo}]},
     try
         case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
-            {ok, Acc} -> {ok, Acc};
+
+            {ok, Acc} ->
+                {ok, Acc};
             {timeout, {WorkersDict, _}} ->
                 DefunctWorkers = fabric_util:remove_done_workers(
                     WorkersDict,
@@ -37,44 +39,49 @@ go(DbName) ->
                     "get_db_info"
                 ),
                 {error, timeout};
-            {error, Error} -> throw(Error)
+            {error, Error} ->
+                throw(Error)
         end
     after
         rexi_monitor:stop(RexiMon)
     end.
 
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
+handle_message({rexi_DOWN,
+        _, {_,NodeRef},_}, _Shard, {Counters, PseqAcc, Acc}) ->
     case fabric_util:remove_down_workers(Counters, NodeRef) of
     {ok, NewCounters} ->
-        {ok, {NewCounters, Acc}};
+        {ok, {NewCounters, PseqAcc, Acc}};
     error ->
         {error, {nodedown, <<"progress not possible">>}}
     end;
 
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, PseqAcc, Acc}) ->
     NewCounters = fabric_dict:erase(Shard, Counters),
     case fabric_view:is_progress_possible(NewCounters) of
     true ->
-        {ok, {NewCounters, Acc}};
+        {ok, {NewCounters, PseqAcc, Acc}};
     false ->
         {error, Reason}
     end;
 
-handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
+handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, PseqAcc, Acc}) ->
     case fabric_dict:lookup_element(Shard, Counters) of
     undefined ->
         % already heard from someone else in this range
-        {ok, {Counters, Acc}};
+        {ok, {Counters, PseqAcc, Acc}};
     nil ->
         Seq = couch_util:get_value(update_seq, Info),
         C1 = fabric_dict:store(Shard, Seq, Counters),
         C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+        PSeq = couch_util:get_value(purge_seq, Info),
+        NewPseqAcc = [{Shard, PSeq}|PseqAcc],
         case fabric_dict:any(nil, C2) of
         true ->
-            {ok, {C2, [Info|Acc]}};
+            {ok, {C2, NewPseqAcc, [Info|Acc]}};
         false ->
             {stop, [
                 {db_name,Name},
+                {purge_seq, fabric_view_changes:pack_seqs(NewPseqAcc)},
                 {update_seq, fabric_view_changes:pack_seqs(C2)} |
                 merge_results(lists:flatten([Info|Acc]))
             ]}
@@ -91,8 +98,6 @@ merge_results(Info) ->
             [{doc_count, lists:sum(X)} | Acc];
         (doc_del_count, X, Acc) ->
             [{doc_del_count, lists:sum(X)} | Acc];
-        (purge_seq, X, Acc) ->
-            [{purge_seq, lists:sum(X)} | Acc];
         (compact_running, X, Acc) ->
             [{compact_running, lists:member(true, X)} | Acc];
         (disk_size, X, Acc) -> % legacy
diff --git a/src/fabric/src/fabric_db_meta.erl b/src/fabric/src/fabric_db_meta.erl
index 367ef06..26e1b37 100644
--- a/src/fabric/src/fabric_db_meta.erl
+++ b/src/fabric/src/fabric_db_meta.erl
@@ -12,7 +12,8 @@
 
 -module(fabric_db_meta).
 
--export([set_revs_limit/3, set_security/3, get_all_security/2]).
+-export([set_revs_limit/3, set_security/3, get_all_security/2,
+    set_purge_infos_limit/3]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -48,6 +49,29 @@ handle_revs_message(Error, _, _Acc) ->
     {error, Error}.
 
 
+set_purge_infos_limit(DbName, Limit, Options) ->
+    Shards = mem3:shards(DbName),
+    Workers = fabric_util:submit_jobs(Shards, set_purge_infos_limit, [Limit, Options]),
+    Handler = fun handle_purge_message/3,
+    Acc0 = {Workers, length(Workers) - 1},
+    case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of
+        {ok, ok} ->
+            ok;
+        {timeout, {DefunctWorkers, _}} ->
+            fabric_util:log_timeout(DefunctWorkers, "set_purged_docs_limit"),
+            {error, timeout};
+        Error ->
+            Error
+    end.
+
+handle_purge_message(ok, _, {_Workers, 0}) ->
+    {stop, ok};
+handle_purge_message(ok, Worker, {Workers, Waiting}) ->
+    {ok, {lists:delete(Worker, Workers), Waiting - 1}};
+handle_purge_message(Error, _, _Acc) ->
+    {error, Error}.
+
+
 set_security(DbName, SecObj, Options) ->
     Shards = mem3:shards(DbName),
     RexiMon = fabric_util:create_monitors(Shards),
diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl
new file mode 100644
index 0000000..2571d0d
--- /dev/null
+++ b/src/fabric/src/fabric_doc_purge.erl
@@ -0,0 +1,572 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_purge).
+
+
+-export([
+    go/3
+]).
+
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+-record(acc, {
+    worker_uuids,
+    resps,
+    uuid_counts,
+    w
+}).
+
+
+go(_, [], _) ->
+    {ok, []};
+go(DbName, IdsRevs, Options) ->
+    % Generate our purge requests of {UUID, DocId, Revs}
+    {UUIDs, Reqs} = create_reqs(IdsRevs, [], []),
+
+    % Fire off rexi workers for each shard.
+    {Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) ->
+        #shard{name = ShardDbName, node = Node} = Shard,
+        Args = [ShardDbName, ShardReqs, Options],
+        Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}),
+        Worker = Shard#shard{ref=Ref},
+        ShardUUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs],
+        {[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]}
+    end, {[], []}, group_reqs_by_shard(DbName, Reqs)),
+
+    UUIDCounts = lists:foldl(fun({_Worker, WUUIDs}, CountAcc) ->
+        lists:foldl(fun(UUID, InnerCountAcc) ->
+            dict:update_counter(UUID, 1, InnerCountAcc)
+        end, CountAcc, WUUIDs)
+    end, dict:new(), WorkerUUIDs),
+
+    RexiMon = fabric_util:create_monitors(Workers),
+    Timeout = fabric_util:request_timeout(),
+    Acc0 = #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = dict:from_list([{UUID, []} || UUID <- UUIDs]),
+        uuid_counts = UUIDCounts,
+        w = w(DbName, Options)
+    },
+    Acc2 = try rexi_utils:recv(Workers, #shard.ref,
+            fun handle_message/3, Acc0, infinity, Timeout) of
+        {ok, Acc1} ->
+            Acc1;
+        {timeout, Acc1} ->
+            #acc{
+                worker_uuids = WorkerUUIDs,
+                resps = Resps
+            } = Acc1,
+            DefunctWorkers = [Worker || {Worker, _} <- WorkerUUIDs],
+            fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
+            NewResps = append_errors(timeout, WorkerUUIDs, Resps),
+            Acc1#acc{worker_uuids = [], resps = NewResps};
+        Else ->
+            Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end,
+
+    FinalResps = format_resps(UUIDs, Acc2),
+    {resp_health(FinalResps), FinalResps}.
+
+
+handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    Pred = fun({#shard{node = N}, _}) -> N == Node end,
+    {Failed, Rest} = lists:partition(Pred, WorkerUUIDs),
+    NewResps = append_errors(internal_server_error, Failed, Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+
+handle_message({rexi_EXIT, _}, Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    {value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+    NewResps = append_errors(internal_server_error, [WorkerPair], Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+
+handle_message({ok, Replies}, Worker, Acc) ->
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = Resps
+    } = Acc,
+    {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+    NewResps = append_resps(UUIDs, Replies, Resps),
+    maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+
+handle_message({bad_request, Msg}, _, _) ->
+    throw({bad_request, Msg}).
+
+
+create_reqs([], UUIDs, Reqs) ->
+    {lists:reverse(UUIDs), lists:reverse(Reqs)};
+
+create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) ->
+    UUID = couch_uuids:new(),
+    NewUUIDs = [UUID | UUIDs],
+    NewReqs = [{UUID, Id, Revs} | Reqs],
+    create_reqs(RestIdsRevs, NewUUIDs, NewReqs).
+
+
+group_reqs_by_shard(DbName, Reqs) ->
+    lists:foldl(fun({_UUID, Id, _Revs} = Req, D0) ->
+        lists:foldl(fun(Shard, D1) ->
+            dict:append(Shard, Req, D1)
+        end, D0, mem3:shards(DbName, Id))
+    end, dict:new(), Reqs).
+
+
+w(DbName, Options) ->
+    try
+        list_to_integer(couch_util:get_value(w, Options))
+    catch _:_ ->
+        mem3:quorum(DbName)
+    end.
+
+
+append_errors(Type, WorkerUUIDs, Resps) ->
+    lists:foldl(fun({_Worker, UUIDs}, RespAcc) ->
+        Errors = [{error, Type} || _UUID <- UUIDs],
+        append_resps(UUIDs, Errors, RespAcc)
+    end, Resps, WorkerUUIDs).
+
+
+append_resps([], [], Resps) ->
+    Resps;
+append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) ->
+    NewResps = dict:append(UUID, Reply, Resps),
+    append_resps(RestUUIDs, RestReplies, NewResps).
+
+
+maybe_stop(#acc{worker_uuids = []} = Acc) ->
+    {stop, Acc};
+maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) ->
+    try
+        dict:fold(fun(UUID, UUIDResps, _) ->
+            UUIDCount = dict:fetch(UUID, Counts),
+            case has_quorum(UUIDResps, UUIDCount, W) of
+                true -> ok;
+                false -> throw(keep_going)
+            end
+        end, nil, Resps),
+        {stop, Acc}
+    catch throw:keep_going ->
+        {ok, Acc}
+    end.
+
+
+format_resps(UUIDs, #acc{} = Acc) ->
+    #acc{
+        resps = Resps,
+        w = W
+    } = Acc,
+    FoldFun = fun(UUID, Replies, ReplyAcc) ->
+        OkReplies = [Reply || {ok, Reply} <- Replies],
+        case OkReplies of
+            [] ->
+                [Error | _] = lists:usort(Replies),
+                [{UUID, Error} | ReplyAcc];
+            _ ->
+                AllRevs = lists:usort(lists:flatten(OkReplies)),
+                IsOk = length(OkReplies) >= W
+                        andalso length(lists:usort(OkReplies)) == 1,
+                Health = if IsOk -> ok; true -> accepted end,
+                [{UUID, {Health, AllRevs}} | ReplyAcc]
+        end
+    end,
+    FinalReplies = dict:fold(FoldFun, {ok, []}, Resps),
+    couch_util:reorder_results(UUIDs, FinalReplies);
+
+format_resps(_UUIDs, Else) ->
+    Else.
+
+
+resp_health(Resps) ->
+    Healths = lists:usort([H || {H, _} <- Resps]),
+    HasError = lists:member(error, Healths),
+    HasAccepted = lists:member(accepted, Healths),
+    AllOk = Healths == [ok],
+    if
+        HasError -> error;
+        HasAccepted -> accepted;
+        AllOk -> ok;
+        true -> error
+    end.
+
+
+has_quorum(Resps, Count, W) ->
+    OkResps = [R || {ok, _} = R <- Resps],
+    OkCounts = lists:foldl(fun(R, Acc) ->
+        orddict:update_counter(R, 1, Acc)
+    end, orddict:new(), OkResps),
+    MaxOk = lists:max([0 | element(2, lists:unzip(OkCounts))]),
+    if
+        MaxOk >= W -> true;
+        length(Resps) >= Count -> true;
+        true -> false
+    end.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+purge_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            t_w2_ok(),
+            t_w3_ok(),
+
+            t_w2_mixed_accepted(),
+            t_w3_mixed_accepted(),
+
+            t_w2_exit1_ok(),
+            t_w2_exit2_accepted(),
+            t_w2_exit3_error(),
+
+            t_w4_accepted(),
+
+            t_mixed_ok_accepted(),
+            t_mixed_errors()
+        ]
+    }.
+
+
+setup() ->
+    meck:new(couch_log),
+    meck:expect(couch_log, warning, fun(_, _) -> ok end),
+    meck:expect(couch_log, notice, fun(_, _) -> ok end).
+
+
+teardown(_) ->
+    meck:unload().
+
+
+t_w2_ok() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, true),
+
+        Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(ok, resp_health(Resps))
+    end).
+
+
+t_w3_ok() ->
+    ?_test(begin
+        Acc0 = create_init_acc(3),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(ok, resp_health(Resps))
+    end).
+
+
+t_w2_mixed_accepted() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
+        Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
+
+        {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [
+            {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
+            {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
+        ],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+
+
+t_w3_mixed_accepted() ->
+    ?_test(begin
+        Acc0 = create_init_acc(3),
+        Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
+        Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
+
+        {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [
+            {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
+            {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
+        ],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+
+
+t_w2_exit1_ok() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(ok, resp_health(Resps))
+    end).
+
+
+t_w2_exit2_accepted() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+
+
+t_w2_exit3_error() ->
+    ?_test(begin
+        Acc0 = create_init_acc(2),
+        ExitMsg = {rexi_EXIT, blargh},
+
+        {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [
+            {error, internal_server_error},
+            {error, internal_server_error}
+        ],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(error, resp_health(Resps))
+    end).
+
+
+t_w4_accepted() ->
+    % Make sure we return when all workers have responded
+    % rather than wait around for a timeout if a user asks
+    % for a qourum with more than the available number of
+    % shards.
+    ?_test(begin
+        Acc0 = create_init_acc(4),
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+        check_quorum(Acc1, false),
+
+        {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+        check_quorum(Acc2, false),
+
+        {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+        ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+        check_quorum(Acc3, true),
+
+        Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+
+
+t_mixed_ok_accepted() ->
+    ?_test(begin
+        WorkerUUIDs = [
+            {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
+
+            {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
+        ],
+
+        Acc0 = #acc{
+            worker_uuids = WorkerUUIDs,
+            resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+            uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+            w = 2
+        },
+
+        Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]},
+        Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+
+        {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+        {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1),
+        {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
+        {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
+        {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4),
+
+        Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(accepted, resp_health(Resps))
+    end).
+
+
+t_mixed_errors() ->
+    ?_test(begin
+        WorkerUUIDs = [
+            {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
+            {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
+
+            {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
+            {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
+        ],
+
+        Acc0 = #acc{
+            worker_uuids = WorkerUUIDs,
+            resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+            uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+            w = 2
+        },
+
+        Msg = {ok, [{ok, [{1, <<"foo">>}]}]},
+        ExitMsg = {rexi_EXIT, blargh},
+
+        {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+        {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+        {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
+        {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
+        {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4),
+
+        Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}],
+        Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
+        ?assertEqual(Expect, Resps),
+        ?assertEqual(error, resp_health(Resps))
+    end).
+
+
+create_init_acc(W) ->
+    UUID1 = <<"uuid1">>,
+    UUID2 = <<"uuid2">>,
+
+    Nodes = [node1, node2, node3],
+    Shards = mem3_util:create_partition_map(<<"foo">>, 3, 1, Nodes),
+
+    % Create our worker_uuids. We're relying on the fact that
+    % we're using a fake Q=1 db so we don't have to worry
+    % about any hashing here.
+    WorkerUUIDs = lists:map(fun(Shard) ->
+        {Shard#shard{ref = erlang:make_ref()}, [UUID1, UUID2]}
+    end, Shards),
+
+    #acc{
+        worker_uuids = WorkerUUIDs,
+        resps = dict:from_list([{UUID1, []}, {UUID2, []}]),
+        uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]),
+        w = W
+    }.
+
+
+worker(N, #acc{worker_uuids = WorkerUUIDs}) ->
+    {Worker, _} = lists:nth(N, WorkerUUIDs),
+    Worker.
+
+
+check_quorum(Acc, Expect) ->
+    dict:fold(fun(_Shard, Resps, _) ->
+        ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w))
+    end, nil, Acc#acc.resps).
+
+-endif.


[couchdb] 04/06: [08/10] Clustered Purge: Update read-repair

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a72f60ea4143f95fec479f98b9cf90f2b1309d75
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed May 30 17:26:02 2018 -0500

    [08/10] Clustered Purge: Update read-repair
    
    Read-repair needs to know which nodes have requested an update to a
    local doc so that it can determine if the update is applied. The basic
    idea here is that we may have gotten an update from a remote node that
    has yet to apply a purge request. If the local node were to apply this
    update it would effectively undo a succesful purge request.
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <ma...@ca.ibm.com>
    Co-authored-by: jiangphcn <ji...@cn.ibm.com>
---
 src/fabric/src/fabric_doc_open.erl         |  73 +++++++-
 src/fabric/src/fabric_doc_open_revs.erl    | 262 +++++++++++++++++++++-----
 src/fabric/src/fabric_rpc.erl              | 128 ++++++++++++-
 src/fabric/test/fabric_rpc_purge_tests.erl | 285 +++++++++++++++++++++++++++++
 4 files changed, 692 insertions(+), 56 deletions(-)

diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl
index 93f73a8..0a85346 100644
--- a/src/fabric/src/fabric_doc_open.erl
+++ b/src/fabric/src/fabric_doc_open.erl
@@ -25,6 +25,7 @@
     r,
     state,
     replies,
+    node_revs = [],
     q_reply
 }).
 
@@ -83,7 +84,13 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
     end;
 handle_message(Reply, Worker, Acc) ->
     NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
-    NewAcc = Acc#acc{replies = NewReplies},
+    NewNodeRevs = case Reply of
+        {ok, #doc{revs = {Pos, [Rev | _]}}} ->
+            [{Worker#shard.node, [{Pos, Rev}]} | Acc#acc.node_revs];
+        _ ->
+            Acc#acc.node_revs
+    end,
+    NewAcc = Acc#acc{replies = NewReplies, node_revs = NewNodeRevs},
     case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
     {true, QuorumReply} ->
         fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
@@ -122,14 +129,14 @@ is_r_met(Workers, Replies, R) ->
         no_more_workers
     end.
 
-read_repair(#acc{dbname=DbName, replies=Replies}) ->
+read_repair(#acc{dbname=DbName, replies=Replies, node_revs=NodeRevs}) ->
     Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
     case Docs of
     % omit local docs from read repair
     [#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
         choose_reply(Docs);
     [#doc{id=Id} | _] ->
-        Opts = [replicated_changes, ?ADMIN_CTX],
+        Opts = [?ADMIN_CTX, {read_repair, NodeRevs}],
         Res = fabric:update_docs(DbName, Docs, Opts),
         case Res of
             {ok, []} ->
@@ -205,6 +212,7 @@ open_doc_test_() ->
             t_handle_message_down(),
             t_handle_message_exit(),
             t_handle_message_reply(),
+            t_store_node_revs(),
             t_read_repair(),
             t_handle_response_quorum_met(),
             t_get_doc_info()
@@ -397,6 +405,65 @@ t_handle_message_reply() ->
     end).
 
 
+t_store_node_revs() ->
+    W1 = #shard{node = w1, ref = erlang:make_ref()},
+    W2 = #shard{node = w2, ref = erlang:make_ref()},
+    W3 = #shard{node = w3, ref = erlang:make_ref()},
+    Foo1 = {ok, #doc{id = <<"bar">>, revs = {1, [<<"foo">>]}}},
+    Foo2 = {ok, #doc{id = <<"bar">>, revs = {2, [<<"foo2">>, <<"foo">>]}}},
+    NFM = {not_found, missing},
+
+    InitAcc = #acc{workers = [W1, W2, W3], replies = [], r = 2},
+
+    ?_test(begin
+        meck:expect(rexi, kill, fun(_, _) -> ok end),
+
+        % Simple case
+        {ok, #acc{node_revs = NodeRevs1}} = handle_message(Foo1, W1, InitAcc),
+        ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs1),
+
+        % Make sure we only hold the head rev
+        {ok, #acc{node_revs = NodeRevs2}} = handle_message(Foo2, W1, InitAcc),
+        ?assertEqual([{w1, [{2, <<"foo2">>}]}], NodeRevs2),
+
+        % Make sure we don't capture anything on error
+        {ok, #acc{node_revs = NodeRevs3}} = handle_message(NFM, W1, InitAcc),
+        ?assertEqual([], NodeRevs3),
+
+        % Make sure we accumulate node revs
+        Acc1 = InitAcc#acc{node_revs = [{w1, [{1, <<"foo">>}]}]},
+        {ok, #acc{node_revs = NodeRevs4}} = handle_message(Foo2, W2, Acc1),
+        ?assertEqual(
+                [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}],
+                NodeRevs4
+            ),
+
+        % Make sure rexi_DOWN doesn't modify node_revs
+        Down = {rexi_DOWN, nil, {nil, w1}, nil},
+        {ok, #acc{node_revs = NodeRevs5}} = handle_message(Down, W2, Acc1),
+        ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs5),
+
+        % Make sure rexi_EXIT doesn't modify node_revs
+        Exit = {rexi_EXIT, reason},
+        {ok, #acc{node_revs = NodeRevs6}} = handle_message(Exit, W2, Acc1),
+        ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs6),
+
+        % Make sure an error doesn't remove any node revs
+        {ok, #acc{node_revs = NodeRevs7}} = handle_message(NFM, W2, Acc1),
+        ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs7),
+
+        % Make sure we have all of our node_revs when meeting
+        % quorum
+        {ok, Acc2} = handle_message(Foo1, W1, InitAcc),
+        {ok, Acc3} = handle_message(Foo2, W2, Acc2),
+        {stop, Acc4} = handle_message(NFM, W3, Acc3),
+        ?assertEqual(
+                [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}],
+                Acc4#acc.node_revs
+            )
+    end).
+
+
 t_read_repair() ->
     Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}},
     Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}},
diff --git a/src/fabric/src/fabric_doc_open_revs.erl b/src/fabric/src/fabric_doc_open_revs.erl
index 096722f..234b108 100644
--- a/src/fabric/src/fabric_doc_open_revs.erl
+++ b/src/fabric/src/fabric_doc_open_revs.erl
@@ -29,6 +29,7 @@
     revs,
     latest,
     replies = [],
+    node_revs = [],
     repair = false
 }).
 
@@ -82,6 +83,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
         worker_count = WorkerCount,
         workers = Workers,
         replies = PrevReplies,
+        node_revs = PrevNodeRevs,
         r = R,
         revs = Revs,
         latest = Latest,
@@ -92,7 +94,6 @@ handle_message({ok, RawReplies}, Worker, State) ->
     IsTree = Revs == all orelse Latest,
 
     % Do not count error replies when checking quorum
-
     RealReplyCount = ReplyCount + 1 - ReplyErrorCount,
     QuorumReplies = RealReplyCount >= R,
     {NewReplies, QuorumMet, Repair} = case IsTree of
@@ -102,11 +103,23 @@ handle_message({ok, RawReplies}, Worker, State) ->
             NumLeafs = couch_key_tree:count_leafs(PrevReplies),
             SameNumRevs = length(RawReplies) == NumLeafs,
             QMet = AllInternal andalso SameNumRevs andalso QuorumReplies,
-            {NewReplies0, QMet, Repair0};
+            % Don't set repair=true on the first reply
+            {NewReplies0, QMet, (ReplyCount > 0) and Repair0};
         false ->
             {NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies),
             {NewReplies0, MinCount >= R, false}
     end,
+    NewNodeRevs = if Worker == nil -> PrevNodeRevs; true ->
+        IdRevs = lists:foldl(fun
+            ({ok, #doc{revs = {Pos, [Rev | _]}}}, Acc) ->
+                [{Pos, Rev} | Acc];
+            (_, Acc) ->
+                Acc
+        end, [], RawReplies),
+        if IdRevs == [] -> PrevNodeRevs; true ->
+            [{Worker#shard.node, IdRevs} | PrevNodeRevs]
+        end
+    end,
 
     Complete = (ReplyCount =:= (WorkerCount - 1)),
 
@@ -117,6 +130,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
                     DbName,
                     IsTree,
                     NewReplies,
+                    NewNodeRevs,
                     ReplyCount + 1,
                     InRepair orelse Repair
                 ),
@@ -124,6 +138,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
         false ->
             {ok, State#state{
                 replies = NewReplies,
+                node_revs = NewNodeRevs,
                 reply_count = ReplyCount + 1,
                 workers = lists:delete(Worker, Workers),
                 repair = InRepair orelse Repair
@@ -180,7 +195,7 @@ dict_replies(Dict, [Reply | Rest]) ->
     dict_replies(NewDict, Rest).
 
 
-maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
+maybe_read_repair(Db, IsTree, Replies, NodeRevs, ReplyCount, DoRepair) ->
     Docs = case IsTree of
         true -> tree_repair_docs(Replies, DoRepair);
         false -> dict_repair_docs(Replies, ReplyCount)
@@ -189,7 +204,7 @@ maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
         [] ->
             ok;
         _ ->
-            erlang:spawn(fun() -> read_repair(Db, Docs) end)
+            erlang:spawn(fun() -> read_repair(Db, Docs, NodeRevs) end)
     end.
 
 
@@ -208,8 +223,9 @@ dict_repair_docs(Replies, ReplyCount) ->
     end.
 
 
-read_repair(Db, Docs) ->
-    Res = fabric:update_docs(Db, Docs, [replicated_changes, ?ADMIN_CTX]),
+read_repair(Db, Docs, NodeRevs) ->
+    Opts = [?ADMIN_CTX, {read_repair, NodeRevs}],
+    Res = fabric:update_docs(Db, Docs, Opts),
     case Res of
         {ok, []} ->
             couch_stats:increment_counter([fabric, read_repairs, success]);
@@ -268,20 +284,24 @@ filter_reply(Replies) ->
 setup() ->
     config:start_link([]),
     meck:new([fabric, couch_stats, couch_log]),
+    meck:new(fabric_util, [passthrough]),
     meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end),
     meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
-    meck:expect(couch_log, notice, fun(_, _) -> ok end).
+    meck:expect(couch_log, notice, fun(_, _) -> ok end),
+    meck:expect(fabric_util, cleanup, fun(_) -> ok end).
+
 
 
 teardown(_) ->
-    (catch meck:unload([fabric, couch_stats, couch_log])),
+    (catch meck:unload([fabric, couch_stats, couch_log, fabric_util])),
     config:stop().
 
 
 state0(Revs, Latest) ->
     #state{
         worker_count = 3,
-        workers = [w1, w2, w3],
+        workers =
+            [#shard{node='node1'}, #shard{node='node2'}, #shard{node='node3'}],
         r = 2,
         revs = Revs,
         latest = Latest
@@ -321,6 +341,14 @@ open_doc_revs_test_() ->
             check_worker_error_skipped(),
             check_quorum_only_counts_valid_responses(),
             check_empty_list_when_no_workers_reply(),
+            check_node_rev_stored(),
+            check_node_rev_store_head_only(),
+            check_node_rev_store_multiple(),
+            check_node_rev_dont_store_errors(),
+            check_node_rev_store_non_errors(),
+            check_node_rev_store_concatenate(),
+            check_node_rev_store_concantenate_multiple(),
+            check_node_rev_unmodified_on_down_or_exit(),
             check_not_found_replies_are_removed_when_doc_found(),
             check_not_found_returned_when_one_of_docs_not_found(),
             check_not_found_returned_when_doc_not_found()
@@ -334,27 +362,35 @@ open_doc_revs_test_() ->
 check_empty_response_not_quorum() ->
     % Simple smoke test that we don't think we're
     % done with a first empty response
+    W1 = #shard{node='node1'},
+    W2 = #shard{node='node2'},
+    W3 = #shard{node='node3'},
     ?_assertMatch(
-        {ok, #state{workers = [w2, w3]}},
-        handle_message({ok, []}, w1, state0(all, false))
+        {ok, #state{workers = [W2, W3]}},
+        handle_message({ok, []}, W1, state0(all, false))
     ).
 
 
 check_basic_response() ->
     % Check that we've handle a response
+    W1 = #shard{node='node1'},
+    W2 = #shard{node='node2'},
+    W3 = #shard{node='node3'},
     ?_assertMatch(
-        {ok, #state{reply_count = 1, workers = [w2, w3]}},
-        handle_message({ok, [foo1(), bar1()]}, w1, state0(all, false))
+        {ok, #state{reply_count = 1, workers = [W2, W3]}},
+        handle_message({ok, [foo1(), bar1()]}, W1, state0(all, false))
     ).
 
 
 check_finish_quorum() ->
     % Two messages with the same revisions means we're done
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         Expect = {stop, [bar1(), foo1()]},
-        ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, w2, S1))
+        ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, W2, S1))
     end).
 
 
@@ -363,11 +399,13 @@ check_finish_quorum_newer() ->
     % foo1 should count for foo2 which means we're finished.
     % We also validate that read_repair was triggered.
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         Expect = {stop, [bar1(), foo2()]},
         ok = meck:reset(fabric),
-        ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, w2, S1)),
+        ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, W2, S1)),
         ok = meck:wait(fabric, update_docs, '_', 5000),
         ?assertMatch(
             [{_, {fabric, update_docs, [_, _, _]}, _}],
@@ -380,11 +418,14 @@ check_no_quorum_on_second() ->
     % Quorum not yet met for the foo revision so we
     % would wait for w3
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
         ?assertMatch(
-            {ok, #state{workers = [w3]}},
-            handle_message({ok, [bar1()]}, w2, S1)
+            {ok, #state{workers = [W3]}},
+            handle_message({ok, [bar1()]}, W2, S1)
         )
     end).
 
@@ -394,11 +435,14 @@ check_done_on_third() ->
     % what. Every revision seen in this pattern should be
     % included.
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(all, false),
-        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
-        {ok, S2} = handle_message({ok, [bar1()]}, w2, S1),
+        {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
+        {ok, S2} = handle_message({ok, [bar1()]}, W2, S1),
         Expect = {stop, [bar1(), foo1()]},
-        ?assertEqual(Expect, handle_message({ok, [bar1()]}, w3, S2))
+        ?assertEqual(Expect, handle_message({ok, [bar1()]}, W3, S2))
     end).
 
 
@@ -407,108 +451,234 @@ check_done_on_third() ->
 
 check_specific_revs_first_msg() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), false),
         ?assertMatch(
-            {ok, #state{reply_count = 1, workers = [w2, w3]}},
-            handle_message({ok, [foo1(), bar1(), bazNF()]}, w1, S0)
+            {ok, #state{reply_count = 1, workers = [W2, W3]}},
+            handle_message({ok, [foo1(), bar1(), bazNF()]}, W1, S0)
         )
     end).
 
 
 check_revs_done_on_agreement() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), false),
         Msg = {ok, [foo1(), bar1(), bazNF()]},
-        {ok, S1} = handle_message(Msg, w1, S0),
+        {ok, S1} = handle_message(Msg, W1, S0),
         Expect = {stop, [bar1(), foo1(), bazNF()]},
-        ?assertEqual(Expect, handle_message(Msg, w2, S1))
+        ?assertEqual(Expect, handle_message(Msg, W2, S1))
     end).
 
 
 check_latest_true() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo2(), bar1(), bazNF()]},
         Msg2 = {ok, [foo2(), bar1(), bazNF()]},
-        {ok, S1} = handle_message(Msg1, w1, S0),
+        {ok, S1} = handle_message(Msg1, W1, S0),
         Expect = {stop, [bar1(), foo2(), bazNF()]},
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1))
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1))
     end).
 
 
 check_ancestor_counted_in_quorum() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), bazNF()]},
         Msg2 = {ok, [foo2(), bar1(), bazNF()]},
         Expect = {stop, [bar1(), foo2(), bazNF()]},
 
         % Older first
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
 
         % Newer first
-        {ok, S2} = handle_message(Msg2, w2, S0),
-        ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+        {ok, S2} = handle_message(Msg2, W2, S0),
+        ?assertEqual(Expect, handle_message(Msg1, W1, S2))
     end).
 
 
 check_not_found_counts_for_descendant() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), bazNF()]},
         Msg2 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
         % not_found first
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
 
         % not_found second
-        {ok, S2} = handle_message(Msg2, w2, S0),
-        ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+        {ok, S2} = handle_message(Msg2, W2, S0),
+        ?assertEqual(Expect, handle_message(Msg1, W1, S2))
     end).
 
 
 check_worker_error_skipped() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {ok, [foo1(), bar1(), baz1()]},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
     end).
 
 
 check_quorum_only_counts_valid_responses() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {rexi_EXIT, reason},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {ok, [foo1(), bar1(), baz1()]},
         Expect = {stop, [bar1(), baz1(), foo1()]},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
     end).
 
 
 check_empty_list_when_no_workers_reply() ->
     ?_test(begin
+        W1 = #shard{node='node1'},
+        W2 = #shard{node='node2'},
+        W3 = #shard{node='node3'},
         S0 = state0(revs(), true),
         Msg1 = {rexi_EXIT, reason},
         Msg2 = {rexi_EXIT, reason},
         Msg3 = {rexi_DOWN, nodedown, {nil, node()}, nil},
         Expect = {stop, all_workers_died},
 
-        {ok, S1} = handle_message(Msg1, w1, S0),
-        {ok, S2} = handle_message(Msg2, w2, S1),
-        ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+        {ok, S1} = handle_message(Msg1, W1, S0),
+        {ok, S2} = handle_message(Msg2, W2, S1),
+        ?assertEqual(Expect, handle_message(Msg3, W3, S2))
+    end).
+
+
+check_node_rev_stored() ->
+    ?_test(begin
+        W1 = #shard{node = node1},
+        S0 = state0([], true),
+
+        {ok, S1} = handle_message({ok, [foo1()]}, W1, S0),
+        ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs)
+    end).
+
+
+check_node_rev_store_head_only() ->
+    ?_test(begin
+        W1 = #shard{node = node1},
+        S0 = state0([], true),
+
+        {ok, S1} = handle_message({ok, [foo2()]}, W1, S0),
+        ?assertEqual([{node1, [{2, <<"foo2">>}]}], S1#state.node_revs)
+    end).
+
+
+check_node_rev_store_multiple() ->
+    ?_test(begin
+        W1 = #shard{node = node1},
+        S0 = state0([], true),
+
+        {ok, S1} = handle_message({ok, [foo1(), foo2()]}, W1, S0),
+        ?assertEqual(
+                [{node1, [{2, <<"foo2">>}, {1, <<"foo">>}]}],
+                S1#state.node_revs
+            )
+    end).
+
+
+check_node_rev_dont_store_errors() ->
+    ?_test(begin
+        W1 = #shard{node = node1},
+        S0 = state0([], true),
+
+        {ok, S1} = handle_message({ok, [barNF()]}, W1, S0),
+        ?assertEqual([], S1#state.node_revs)
+    end).
+
+
+check_node_rev_store_non_errors() ->
+    ?_test(begin
+        W1 = #shard{node = node1},
+        S0 = state0([], true),
+
+        {ok, S1} = handle_message({ok, [foo1(), barNF()]}, W1, S0),
+        ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs)
+    end).
+
+
+check_node_rev_store_concatenate() ->
+    ?_test(begin
+        W2 = #shard{node = node2},
+        S0 = state0([], true),
+        S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]},
+
+        {ok, S2} = handle_message({ok, [foo2()]}, W2, S1),
+        ?assertEqual(
+                [{node2, [{2, <<"foo2">>}]}, {node1, [{1, <<"foo">>}]}],
+                S2#state.node_revs
+            )
+    end).
+
+
+check_node_rev_store_concantenate_multiple() ->
+    ?_test(begin
+        W2 = #shard{node = node2},
+        S0 = state0([], true),
+        S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]},
+
+        {ok, S2} = handle_message({ok, [foo2(), bar1()]}, W2, S1),
+        ?assertEqual(
+                [
+                    {node2, [{1, <<"bar">>}, {2, <<"foo2">>}]},
+                    {node1, [{1, <<"foo">>}]}
+                ],
+                S2#state.node_revs
+            )
+    end).
+
+
+check_node_rev_unmodified_on_down_or_exit() ->
+    ?_test(begin
+        W2 = #shard{node = node2},
+        S0 = state0([], true),
+        S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]},
+
+        Down = {rexi_DOWN, nodedown, {nil, node()}, nil},
+        {ok, S2} = handle_message(Down, W2, S1),
+        ?assertEqual(
+                [{node1, [{1, <<"foo">>}]}],
+                S2#state.node_revs
+            ),
+
+        Exit = {rexi_EXIT, reason},
+        {ok, S3} = handle_message(Exit, W2, S1),
+        ?assertEqual(
+                [{node1, [{1, <<"foo">>}]}],
+                S3#state.node_revs
+            )
     end).
 
 
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 4a69e7e..d8a38ba 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -21,6 +21,7 @@
     delete_shard_db_doc/2]).
 -export([get_all_security/2, open_shard/2]).
 -export([compact/1, compact/2]).
+-export([get_purge_seq/2, purge_docs/3, set_purge_infos_limit/3]).
 
 -export([get_db_info/2, get_doc_count/2, get_update_seq/2,
          changes/4, map_view/5, reduce_view/5, group_info/3, update_mrview/4]).
@@ -201,6 +202,9 @@ get_all_security(DbName, Options) ->
 set_revs_limit(DbName, Limit, Options) ->
     with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
 
+set_purge_infos_limit(DbName, Limit, Options) ->
+    with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}).
+
 open_doc(DbName, DocId, Options) ->
     with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
 
@@ -236,14 +240,26 @@ get_missing_revs(DbName, IdRevsList, Options) ->
     end).
 
 update_docs(DbName, Docs0, Options) ->
-    case proplists:get_value(replicated_changes, Options) of
-    true ->
-        X = replicated_changes;
-    _ ->
-        X = interactive_edit
+    {Docs1, Type} = case couch_util:get_value(read_repair, Options) of
+        NodeRevs when is_list(NodeRevs) ->
+            Filtered = read_repair_filter(DbName, Docs0, NodeRevs, Options),
+            {Filtered, replicated_changes};
+        undefined ->
+            X = case proplists:get_value(replicated_changes, Options) of
+                true -> replicated_changes;
+                _ -> interactive_edit
+            end,
+            {Docs0, X}
     end,
-    Docs = make_att_readers(Docs0),
-    with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
+    Docs2 = make_att_readers(Docs1),
+    with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, Type]}).
+
+
+get_purge_seq(DbName, Options) ->
+    with_db(DbName, Options, {couch_db, get_purge_seq, []}).
+
+purge_docs(DbName, UUIdsIdsRevs, Options) ->
+    with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, Options]}).
 
 %% @equiv group_info(DbName, DDocId, [])
 group_info(DbName, DDocId) ->
@@ -298,6 +314,104 @@ with_db(DbName, Options, {M,F,A}) ->
         rexi:reply(Error)
     end.
 
+
+read_repair_filter(DbName, Docs, NodeRevs, Options) ->
+    set_io_priority(DbName, Options),
+    case get_or_create_db(DbName, Options) of
+        {ok, Db} ->
+            try
+                read_repair_filter(Db, Docs, NodeRevs)
+            after
+                couch_db:close(Db)
+            end;
+        Error ->
+            rexi:reply(Error)
+    end.
+
+
+% A read repair operation may have been triggered by a node
+% that was out of sync with the local node. Thus, any time
+% we receive a read repair request we need to check if we
+% may have recently purged any of the given revisions and
+% ignore them if so.
+%
+% This is accomplished by looking at the purge infos that we
+% have locally that have not been replicated to the remote
+% node. The logic here is that we may have received the purge
+% request before the remote shard copy. So to check that we
+% need to look at the purge infos that we have locally but
+% have not yet sent to the remote copy.
+%
+% NodeRevs is a list of the {node(), [rev()]} tuples passed
+% as the read_repair option to update_docs.
+read_repair_filter(Db, Docs, NodeRevs) ->
+    [#doc{id = DocId} | _] = Docs,
+    Nodes = lists:usort([Node || {Node, _} <- NodeRevs, Node /= node()]),
+    NodeSeqs = get_node_seqs(Db, Nodes),
+
+    DbPSeq = couch_db:get_purge_seq(Db),
+    Lag = config:get_integer("couchdb", "read_repair_lag", 100),
+
+    % Filter out read-repair updates from any node that is
+    % so out of date that it would force us to scan a large
+    % number of purge infos
+    NodeFiltFun = fun({Node, _Revs}) ->
+        {Node, NodeSeq} = lists:keyfind(Node, 1, NodeSeqs),
+        NodeSeq >= DbPSeq - Lag
+    end,
+    RecentNodeRevs = lists:filter(NodeFiltFun, NodeRevs),
+
+    % For each node we scan the purge infos to filter out any
+    % revisions that have been locally purged since we last
+    % replicated to the remote node's shard copy.
+    AllowableRevs = lists:foldl(fun({Node, Revs}, RevAcc) ->
+        {Node, StartSeq} = lists:keyfind(Node, 1, NodeSeqs),
+        FoldFun = fun({_PSeq, _UUID, PDocId, PRevs}, InnerAcc) ->
+            if PDocId /= DocId -> {ok, InnerAcc}; true ->
+                {ok, InnerAcc -- PRevs}
+            end
+        end,
+        {ok, FiltRevs} = couch_db:fold_purge_infos(Db, StartSeq, FoldFun, Revs),
+        lists:usort(FiltRevs ++ RevAcc)
+    end, [], RecentNodeRevs),
+
+    % Finally, filter the doc updates to only include revisions
+    % that have not been purged locally.
+    DocFiltFun = fun(#doc{revs = {Pos, [Rev | _]}}) ->
+        lists:member({Pos, Rev}, AllowableRevs)
+    end,
+    lists:filter(DocFiltFun, Docs).
+
+
+get_node_seqs(Db, Nodes) ->
+    % Gather the list of {Node, PurgeSeq} pairs for all nodes
+    % that are present in our read repair group
+    FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
+        case Id of
+            <<?LOCAL_DOC_PREFIX, "purge-mem3-", _/binary>> ->
+                TgtNode = couch_util:get_value(<<"target_node">>, Props),
+                PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props),
+                case lists:keyfind(TgtNode, 1, Acc) of
+                    {_, OldSeq} ->
+                        NewSeq = erlang:max(OldSeq, PurgeSeq),
+                        NewEntry = {TgtNode, NewSeq},
+                        NewAcc = lists:keyreplace(TgtNode, 1, Acc, NewEntry),
+                        {ok, NewAcc};
+                    false ->
+                        {ok, Acc}
+                end;
+            _ ->
+                % We've processed all _local mem3 purge docs
+                {stop, Acc}
+        end
+    end,
+    InitAcc = [{list_to_binary(atom_to_list(Node)), 0} || Node <- Nodes],
+    Opts = [{start_key, <<?LOCAL_DOC_PREFIX, "purge-mem3-">>}],
+    {ok, NodeBinSeqs} = couch_db:fold_local_docs(Db, FoldFun, InitAcc, Opts),
+    [{list_to_existing_atom(binary_to_list(N)), S} || {N, S} <- NodeBinSeqs].
+
+
+
 get_or_create_db(DbName, Options) ->
     couch_db:open_int(DbName, [{create_if_missing, true} | Options]).
 
diff --git a/src/fabric/test/fabric_rpc_purge_tests.erl b/src/fabric/test/fabric_rpc_purge_tests.erl
new file mode 100644
index 0000000..26507cf
--- /dev/null
+++ b/src/fabric/test/fabric_rpc_purge_tests.erl
@@ -0,0 +1,285 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_rpc_purge_tests).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(TDEF(A), {A, fun A/1}).
+
+% TODO: Add tests:
+%         - filter some updates
+%         - allow for an update that was filtered by a node
+%         - ignore lagging nodes
+
+main_test_() ->
+    {
+        setup,
+        spawn,
+        fun setup_all/0,
+        fun teardown_all/1,
+        [
+            {
+                foreach,
+                fun setup_no_purge/0,
+                fun teardown_no_purge/1,
+                lists:map(fun wrap/1, [
+                    ?TDEF(t_no_purge_no_filter)
+                ])
+            },
+            {
+                foreach,
+                fun setup_single_purge/0,
+                fun teardown_single_purge/1,
+                lists:map(fun wrap/1, [
+                    ?TDEF(t_filter),
+                    ?TDEF(t_filter_unknown_node),
+                    ?TDEF(t_no_filter_old_node),
+                    ?TDEF(t_no_filter_different_node),
+                    ?TDEF(t_no_filter_after_repl)
+                ])
+            },
+            {
+                foreach,
+                fun setup_multi_purge/0,
+                fun teardown_multi_purge/1,
+                lists:map(fun wrap/1, [
+                    ?TDEF(t_filter),
+                    ?TDEF(t_filter_unknown_node),
+                    ?TDEF(t_no_filter_old_node),
+                    ?TDEF(t_no_filter_different_node),
+                    ?TDEF(t_no_filter_after_repl)
+                ])
+            }
+        ]
+    }.
+
+
+setup_all() ->
+    test_util:start_couch().
+
+
+teardown_all(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+
+setup_no_purge() ->
+    {ok, Db} = create_db(),
+    populate_db(Db),
+    couch_db:name(Db).
+
+
+teardown_no_purge(DbName) ->
+    ok = couch_server:delete(DbName, []).
+
+
+setup_single_purge() ->
+    DbName = setup_no_purge(),
+    DocId = <<"0003">>,
+    {ok, OldDoc} = open_doc(DbName, DocId),
+    purge_doc(DbName, DocId),
+    {DbName, DocId, OldDoc, 1}.
+
+
+teardown_single_purge({DbName, _, _, _}) ->
+    teardown_no_purge(DbName).
+
+
+setup_multi_purge() ->
+    DbName = setup_no_purge(),
+    DocId = <<"0003">>,
+    {ok, OldDoc} = open_doc(DbName, DocId),
+    lists:foreach(fun(I) ->
+        PDocId = iolist_to_binary(io_lib:format("~4..0b", [I])),
+        purge_doc(DbName, PDocId)
+    end, lists:seq(1, 5)),
+    {DbName, DocId, OldDoc, 3}.
+
+
+teardown_multi_purge(Ctx) ->
+    teardown_single_purge(Ctx).
+
+
+t_no_purge_no_filter(DbName) ->
+    DocId = <<"0003">>,
+
+    {ok, OldDoc} = open_doc(DbName, DocId),
+    NewDoc = create_update(OldDoc, 2),
+
+    rpc_update_doc(DbName, NewDoc),
+
+    {ok, CurrDoc} = open_doc(DbName, DocId),
+    ?assert(CurrDoc /= OldDoc),
+    ?assert(CurrDoc == NewDoc).
+
+
+t_filter({DbName, DocId, OldDoc, _PSeq}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, 0),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)).
+
+
+t_filter_unknown_node({DbName, DocId, OldDoc, _PSeq}) ->
+    % Unknown nodes are assumed to start at PurgeSeq = 0
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, 0),
+
+    {Pos, [Rev | _]} = OldDoc#doc.revs,
+    RROpt = {read_repair, [{'blargh@127.0.0.1', [{Pos, Rev}]}]},
+    rpc_update_doc(DbName, OldDoc, [RROpt]),
+
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)).
+
+
+t_no_filter_old_node({DbName, DocId, OldDoc, PSeq}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, PSeq),
+
+    % The random UUID is to generate a badarg exception when
+    % we try and convert it to an existing atom.
+    create_purge_checkpoint(DbName, 0, couch_uuids:random()),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+t_no_filter_different_node({DbName, DocId, OldDoc, PSeq}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, PSeq),
+
+    % Create a valid purge for a different node
+    TgtNode = list_to_binary(atom_to_list('notfoo@127.0.0.1')),
+    create_purge_checkpoint(DbName, 0, TgtNode),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+t_no_filter_after_repl({DbName, DocId, OldDoc, PSeq}) ->
+    ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+    create_purge_checkpoint(DbName, PSeq),
+
+    rpc_update_doc(DbName, OldDoc),
+
+    ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+wrap({Name, Fun}) ->
+    fun(Arg) ->
+        {timeout, 60, {atom_to_list(Name), fun() ->
+            process_flag(trap_exit, true),
+            Fun(Arg)
+        end}}
+    end.
+
+
+create_db() ->
+    DbName = ?tempdb(),
+    couch_db:create(DbName, [?ADMIN_CTX]).
+
+
+populate_db(Db) ->
+    Docs = lists:map(fun(Idx) ->
+        DocId = lists:flatten(io_lib:format("~4..0b", [Idx])),
+        #doc{
+            id = list_to_binary(DocId),
+            body = {[{<<"int">>, Idx}, {<<"vsn">>, 2}]}
+        }
+    end, lists:seq(1, 100)),
+    {ok, _} = couch_db:update_docs(Db, Docs).
+
+
+open_doc(DbName, DocId) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        couch_db:open_doc(Db, DocId, [])
+    end).
+
+
+create_update(Doc, NewVsn) ->
+    #doc{
+        id = DocId,
+        revs = {Pos, [Rev | _] = Revs},
+        body = {Props}
+    } = Doc,
+    NewProps = lists:keyreplace(<<"vsn">>, 1, Props, {<<"vsn">>, NewVsn}),
+    NewRev = crypto:hash(md5, term_to_binary({DocId, Rev, {NewProps}})),
+    Doc#doc{
+        revs = {Pos + 1, [NewRev | Revs]},
+        body = {NewProps}
+    }.
+
+
+purge_doc(DbName, DocId) ->
+    {ok, Doc} = open_doc(DbName, DocId),
+    {Pos, [Rev | _]} = Doc#doc.revs,
+    PInfo = {couch_uuids:random(), DocId, [{Pos, Rev}]},
+    Resp = couch_util:with_db(DbName, fun(Db) ->
+        couch_db:purge_docs(Db, [PInfo], [])
+    end),
+    ?assertEqual({ok, [{ok, [{Pos, Rev}]}]}, Resp).
+
+
+create_purge_checkpoint(DbName, PurgeSeq) ->
+    create_purge_checkpoint(DbName, PurgeSeq, tgt_node_bin()).
+
+
+create_purge_checkpoint(DbName, PurgeSeq, TgtNode) when is_binary(TgtNode) ->
+    Resp = couch_util:with_db(DbName, fun(Db) ->
+        SrcUUID = couch_db:get_uuid(Db),
+        TgtUUID = couch_uuids:random(),
+        CPDoc = #doc{
+            id = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
+            body = {[
+                {<<"target_node">>, TgtNode},
+                {<<"purge_seq">>, PurgeSeq}
+            ]}
+        },
+        couch_db:update_docs(Db, [CPDoc], [])
+    end),
+    ?assertMatch({ok, [_]}, Resp).
+
+
+rpc_update_doc(DbName, Doc) ->
+    {Pos, [Rev | _]} = Doc#doc.revs,
+    RROpt = {read_repair, [{tgt_node(), [{Pos, Rev}]}]},
+    rpc_update_doc(DbName, Doc, [RROpt]).
+
+
+rpc_update_doc(DbName, Doc, Opts) ->
+    Ref = erlang:make_ref(),
+    put(rexi_from, {self(), Ref}),
+    fabric_rpc:update_docs(DbName, [Doc], Opts),
+    Reply = test_util:wait(fun() ->
+        receive
+            {Ref, Reply} ->
+                Reply
+        after 0 ->
+            wait
+        end
+    end),
+    ?assertEqual({ok, []}, Reply).
+
+
+tgt_node() ->
+    'foo@127.0.0.1'.
+
+
+tgt_node_bin() ->
+    iolist_to_binary(atom_to_list(tgt_node())).
\ No newline at end of file


[couchdb] 01/06: [05/10] Clustered Purge: Add upgrade tests

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f291730ac026cc616ec7e34ae2b397d35986f9c1
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed May 30 17:01:01 2018 -0500

    [05/10] Clustered Purge: Add upgrade tests
    
    These test that we can successfully upgrade old databases that have
    various configurations of purge requests in the legacy format.
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <ma...@ca.ibm.com>
    Co-authored-by: jiangphcn <ji...@cn.ibm.com>
---
 src/couch/test/couch_bt_engine_upgrade_tests.erl   | 220 +++++++++++++++++++++
 src/couch/test/fixtures/db_with_1_purge_req.couch  | Bin 0 -> 12470 bytes
 .../fixtures/db_with_1_purge_req_for_2_docs.couch  | Bin 0 -> 16557 bytes
 src/couch/test/fixtures/db_with_2_purge_req.couch  | Bin 0 -> 16566 bytes
 src/couch/test/fixtures/db_without_purge_req.couch | Bin 0 -> 61644 bytes
 src/couch_pse_tests/src/cpse_test_purge_docs.erl   |  92 ++++-----
 src/couch_pse_tests/src/cpse_test_purge_seqs.erl   |  28 +--
 src/couch_pse_tests/src/cpse_util.erl              |   6 +-
 8 files changed, 283 insertions(+), 63 deletions(-)

diff --git a/src/couch/test/couch_bt_engine_upgrade_tests.erl b/src/couch/test/couch_bt_engine_upgrade_tests.erl
new file mode 100644
index 0000000..1d2a86d
--- /dev/null
+++ b/src/couch/test/couch_bt_engine_upgrade_tests.erl
@@ -0,0 +1,220 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_bt_engine_upgrade_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+setup() ->
+    Ctx = test_util:start_couch(),
+    DbDir = config:get("couchdb", "database_dir"),
+    DbFileNames = [
+        "db_without_purge_req.couch",
+        "db_with_1_purge_req.couch",
+        "db_with_2_purge_req.couch",
+        "db_with_1_purge_req_for_2_docs.couch"
+    ],
+    NewPaths = lists:map(fun(DbFileName) ->
+        OldDbFilePath = filename:join([?FIXTURESDIR, DbFileName]),
+        NewDbFilePath = filename:join([DbDir, DbFileName]),
+        ok = filelib:ensure_dir(NewDbFilePath),
+        file:delete(NewDbFilePath),
+        {ok, _} = file:copy(OldDbFilePath, NewDbFilePath),
+        NewDbFilePath
+    end, DbFileNames),
+    {Ctx, NewPaths}.
+
+
+teardown({Ctx, Paths}) ->
+    test_util:stop_couch(Ctx),
+    lists:foreach(fun(Path) ->
+        file:delete(Path)
+    end, Paths).
+
+
+upgrade_test_() ->
+    {
+        "Couch Bt Engine Upgrade tests",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                t_upgrade_without_purge_req(),
+                t_upgrade_with_1_purge_req(),
+                t_upgrade_with_N_purge_req(),
+                t_upgrade_with_1_purge_req_for_2_docs()
+            ]
+        }
+    }.
+
+
+t_upgrade_without_purge_req() ->
+    ?_test(begin
+        % There are three documents in the fixture
+        % db with zero purge entries
+        DbName = <<"db_without_purge_req">>,
+
+        {ok, UpgradedPurged} = couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual(0, couch_db:get_purge_seq(Db)),
+            couch_db:fold_purge_infos(Db, 0, fun fold_fun/2, [])
+        end),
+        ?assertEqual([], UpgradedPurged),
+
+        {ok, Rev} = save_doc(
+            DbName, {[{<<"_id">>, <<"doc4">>}, {<<"v">>, 1}]}
+        ),
+        {ok, _} = save_doc(DbName, {[{<<"_id">>, <<"doc5">>}, {<<"v">>, 2}]}),
+
+        couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual({ok, 5}, couch_db:get_doc_count(Db)),
+            ?assertEqual(0, couch_db:get_purge_seq(Db))
+        end),
+
+        PurgeReqs = [
+            {couch_uuids:random(), <<"doc4">>, [Rev]}
+        ],
+
+        {ok, [{ok, PRevs}]} = couch_util:with_db(DbName, fun(Db) ->
+            couch_db:purge_docs(Db, PurgeReqs)
+        end),
+        ?assertEqual(PRevs, [Rev]),
+
+        couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual({ok, 4}, couch_db:get_doc_count(Db)),
+            ?assertEqual(1, couch_db:get_purge_seq(Db))
+        end)
+    end).
+
+
+t_upgrade_with_1_purge_req() ->
+    ?_test(begin
+        % There are two documents in the fixture database
+        % with a single purge entry
+        DbName = <<"db_with_1_purge_req">>,
+
+        {ok, UpgradedPurged} = couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual(1, couch_db:get_purge_seq(Db)),
+            couch_db:fold_purge_infos(Db, 0, fun fold_fun/2, [])
+        end),
+        ?assertEqual([{1, <<"doc1">>}], UpgradedPurged),
+
+        {ok, Rev} = save_doc(
+            DbName, {[{<<"_id">>, <<"doc4">>}, {<<"v">>, 1}]}
+        ),
+        {ok, _} = save_doc(DbName, {[{<<"_id">>, <<"doc5">>}, {<<"v">>, 2}]}),
+
+        couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual({ok, 4}, couch_db:get_doc_count(Db)),
+            ?assertEqual(1, couch_db:get_purge_seq(Db))
+        end),
+
+        PurgeReqs = [
+            {couch_uuids:random(), <<"doc4">>, [Rev]}
+        ],
+
+        {ok, [{ok, PRevs}]} = couch_util:with_db(DbName, fun(Db) ->
+            couch_db:purge_docs(Db, PurgeReqs)
+        end),
+        ?assertEqual(PRevs, [Rev]),
+
+        couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual({ok, 3}, couch_db:get_doc_count(Db)),
+            ?assertEqual(2, couch_db:get_purge_seq(Db))
+        end)
+    end).
+
+
+t_upgrade_with_N_purge_req() ->
+    ?_test(begin
+        % There is one document in the fixture database
+        % with two docs that have been purged
+        DbName = <<"db_with_2_purge_req">>,
+
+        {ok, UpgradedPurged} = couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual(2, couch_db:get_purge_seq(Db)),
+            couch_db:fold_purge_infos(Db, 1, fun fold_fun/2, [])
+        end),
+        ?assertEqual([{2, <<"doc2">>}], UpgradedPurged),
+
+        {ok, Rev} = save_doc(DbName, {[{<<"_id">>, <<"doc4">>}, {<<"v">>, 1}]}),
+        {ok, _} = save_doc(DbName, {[{<<"_id">>, <<"doc5">>}, {<<"v">>, 2}]}),
+
+        couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual({ok, 3}, couch_db:get_doc_count(Db)),
+            ?assertEqual(2, couch_db:get_purge_seq(Db))
+        end),
+
+        PurgeReqs = [
+            {couch_uuids:random(), <<"doc4">>, [Rev]}
+        ],
+
+        {ok, [{ok, PRevs}]} = couch_util:with_db(DbName, fun(Db) ->
+            couch_db:purge_docs(Db, PurgeReqs)
+        end),
+        ?assertEqual(PRevs, [Rev]),
+
+        couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual({ok, 2}, couch_db:get_doc_count(Db)),
+            ?assertEqual(3, couch_db:get_purge_seq(Db))
+        end)
+    end).
+
+
+t_upgrade_with_1_purge_req_for_2_docs() ->
+    ?_test(begin
+        % There are two documents (Doc4 and Doc5) in the fixture database
+        % with three docs (Doc1, Doc2 and Doc3) that have been purged, and
+        % with one purge req for Doc1 and another purge req for Doc 2 and Doc3
+        DbName = <<"db_with_1_purge_req_for_2_docs">>,
+
+        {ok, UpgradedPurged} = couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual(3, couch_db:get_purge_seq(Db)),
+            couch_db:fold_purge_infos(Db, 1, fun fold_fun/2, [])
+        end),
+        ?assertEqual([{3,<<"doc2">>},{2,<<"doc3">>}], UpgradedPurged),
+
+        {ok, Rev} = save_doc(DbName, {[{<<"_id">>, <<"doc6">>}, {<<"v">>, 1}]}),
+        {ok, _} = save_doc(DbName, {[{<<"_id">>, <<"doc7">>}, {<<"v">>, 2}]}),
+
+        couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual({ok, 4}, couch_db:get_doc_count(Db)),
+            ?assertEqual(3, couch_db:get_purge_seq(Db))
+        end),
+
+        PurgeReqs = [
+            {couch_uuids:random(), <<"doc6">>, [Rev]}
+        ],
+
+        {ok, [{ok, PRevs}]} = couch_util:with_db(DbName, fun(Db) ->
+            couch_db:purge_docs(Db, PurgeReqs)
+        end),
+        ?assertEqual(PRevs, [Rev]),
+
+        couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual({ok, 3}, couch_db:get_doc_count(Db)),
+            ?assertEqual(4, couch_db:get_purge_seq(Db))
+        end)
+    end).
+
+
+save_doc(DbName, Json) ->
+    Doc = couch_doc:from_json_obj(Json),
+    couch_util:with_db(DbName, fun(Db) ->
+        couch_db:update_doc(Db, Doc, [])
+    end).
+
+
+fold_fun({PSeq, _UUID, Id, _Revs}, Acc) ->
+    {ok, [{PSeq, Id} | Acc]}.
diff --git a/src/couch/test/fixtures/db_with_1_purge_req.couch b/src/couch/test/fixtures/db_with_1_purge_req.couch
new file mode 100644
index 0000000..b0d39c9
Binary files /dev/null and b/src/couch/test/fixtures/db_with_1_purge_req.couch differ
diff --git a/src/couch/test/fixtures/db_with_1_purge_req_for_2_docs.couch b/src/couch/test/fixtures/db_with_1_purge_req_for_2_docs.couch
new file mode 100644
index 0000000..b584fce
Binary files /dev/null and b/src/couch/test/fixtures/db_with_1_purge_req_for_2_docs.couch differ
diff --git a/src/couch/test/fixtures/db_with_2_purge_req.couch b/src/couch/test/fixtures/db_with_2_purge_req.couch
new file mode 100644
index 0000000..ee4e11b
Binary files /dev/null and b/src/couch/test/fixtures/db_with_2_purge_req.couch differ
diff --git a/src/couch/test/fixtures/db_without_purge_req.couch b/src/couch/test/fixtures/db_without_purge_req.couch
new file mode 100644
index 0000000..814feb8
Binary files /dev/null and b/src/couch/test/fixtures/db_without_purge_req.couch differ
diff --git a/src/couch_pse_tests/src/cpse_test_purge_docs.erl b/src/couch_pse_tests/src/cpse_test_purge_docs.erl
index 3378825..34bd34d 100644
--- a/src/couch_pse_tests/src/cpse_test_purge_docs.erl
+++ b/src/couch_pse_tests/src/cpse_test_purge_docs.erl
@@ -33,13 +33,13 @@ teardown_each(DbName) ->
 cpse_purge_simple(DbName) ->
     {ok, Rev} = cpse_util:save_doc(DbName, {[{'_id', foo1}, {vsn, 1.1}]}),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 1},
         {del_doc_count, 0},
         {update_seq, 1},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos = [
         {cpse_util:uuid(), <<"foo1">>, [Rev]}
@@ -48,13 +48,13 @@ cpse_purge_simple(DbName) ->
     {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos),
     ?assertEqual([Rev], PRevs),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 0},
         {del_doc_count, 0},
         {update_seq, 2},
         {purge_seq, 1},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_simple_info_check(DbName) ->
@@ -80,14 +80,14 @@ cpse_purge_empty_db(DbName) ->
     {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos),
     ?assertEqual([], PRevs),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 0},
         {del_doc_count, 0},
         {update_seq, 1},
         {changes, 0},
         {purge_seq, 1},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_single_docid(DbName) ->
@@ -96,14 +96,14 @@ cpse_purge_single_docid(DbName) ->
         {[{'_id', foo2}, {vsn, 2}]}
     ]),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 2},
         {del_doc_count, 0},
         {update_seq, 2},
         {changes, 2},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos = [
         {cpse_util:uuid(), <<"foo1">>, [Rev1]}
@@ -111,14 +111,14 @@ cpse_purge_single_docid(DbName) ->
     {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos),
     ?assertEqual([Rev1], PRevs),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 1},
         {del_doc_count, 0},
         {update_seq, 3},
         {changes, 1},
         {purge_seq, 1},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_multiple_docids(DbName) ->
@@ -127,14 +127,14 @@ cpse_purge_multiple_docids(DbName) ->
         {[{'_id', foo2}, {vsn, 1.2}]}
     ]),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 2},
         {del_doc_count, 0},
         {update_seq, 2},
         {changes, 2},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos = [
         {cpse_util:uuid(), <<"foo1">>, [Rev1]},
@@ -146,14 +146,14 @@ cpse_purge_multiple_docids(DbName) ->
     ?assertEqual([Rev1], PRevs1),
     ?assertEqual([Rev2], PRevs2),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 0},
         {del_doc_count, 0},
         {update_seq, 3},
         {changes, 0},
         {purge_seq, 2},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_no_docids(DbName) ->
@@ -162,25 +162,25 @@ cpse_purge_no_docids(DbName) ->
         {[{'_id', foo2}, {vsn, 2}]}
     ]),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 2},
         {del_doc_count, 0},
         {update_seq, 2},
         {changes, 2},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     {ok, []} = cpse_util:purge(DbName, []),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 2},
         {del_doc_count, 0},
         {update_seq, 2},
         {changes, 2},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_rev_path(DbName) ->
@@ -193,14 +193,14 @@ cpse_purge_rev_path(DbName) ->
     ]},
     {ok, Rev2} = cpse_util:save_doc(DbName, Update),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 0},
         {del_doc_count, 1},
         {update_seq, 2},
         {changes, 1},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos = [
         {cpse_util:uuid(), <<"foo">>, [Rev2]}
@@ -209,14 +209,14 @@ cpse_purge_rev_path(DbName) ->
     {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos),
     ?assertEqual([Rev2], PRevs),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 0},
         {del_doc_count, 0},
         {update_seq, 3},
         {changes, 0},
         {purge_seq, 1},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_deep_revision_path(DbName) ->
@@ -238,14 +238,14 @@ cpse_purge_deep_revision_path(DbName) ->
     {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos),
     ?assertEqual([LastRev], PRevs),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 0},
         {del_doc_count, 0},
         {update_seq, ?REV_DEPTH + 2},
         {changes, 0},
         {purge_seq, 1},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_partial_revs(DbName) ->
@@ -264,14 +264,14 @@ cpse_purge_partial_revs(DbName) ->
     {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos),
     ?assertEqual([Rev1], PRevs),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 1},
         {del_doc_count, 0},
         {update_seq, 3},
         {changes, 1},
         {purge_seq, 1},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_missing_docid(DbName) ->
@@ -280,14 +280,14 @@ cpse_purge_missing_docid(DbName) ->
         {[{'_id', foo2}, {vsn, 2}]}
     ]),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 2},
         {del_doc_count, 0},
         {update_seq, 2},
         {changes, 2},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos = [
         {cpse_util:uuid(), <<"baz">>, [Rev1]}
@@ -295,14 +295,14 @@ cpse_purge_missing_docid(DbName) ->
 
     {ok, [{ok, []}]} = cpse_util:purge(DbName, PurgeInfos),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 2},
         {del_doc_count, 0},
         {update_seq, 3},
         {changes, 2},
         {purge_seq, 1},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_duplicate_docids(DbName) ->
@@ -311,14 +311,14 @@ cpse_purge_duplicate_docids(DbName) ->
         {[{'_id', foo2}, {vsn, 2}]}
     ]),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 2},
         {del_doc_count, 0},
         {update_seq, 2},
         {purge_seq, 0},
         {changes, 2},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos = [
         {cpse_util:uuid(), <<"foo1">>, [Rev1]},
@@ -328,14 +328,14 @@ cpse_purge_duplicate_docids(DbName) ->
     {ok, Resp} = cpse_util:purge(DbName, PurgeInfos),
     ?assertEqual([{ok, [Rev1]}, {ok, []}], Resp),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 1},
         {del_doc_count, 0},
         {update_seq, 3},
         {purge_seq, 2},
         {changes, 1},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_internal_revision(DbName) ->
@@ -354,14 +354,14 @@ cpse_purge_internal_revision(DbName) ->
     {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos),
     ?assertEqual([], PRevs),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 1},
         {del_doc_count, 0},
         {update_seq, 3},
         {changes, 1},
         {purge_seq, 1},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_missing_revision(DbName) ->
@@ -377,14 +377,14 @@ cpse_purge_missing_revision(DbName) ->
     {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos),
     ?assertEqual([], PRevs),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 2},
         {del_doc_count, 0},
         {update_seq, 3},
         {changes, 2},
         {purge_seq, 1},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_repeated_revisions(DbName) ->
@@ -396,14 +396,14 @@ cpse_purge_repeated_revisions(DbName) ->
     ]},
     {ok, [Rev2]} = cpse_util:save_docs(DbName, [Update], [replicated_changes]),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 1},
         {del_doc_count, 0},
         {update_seq, 2},
         {changes, 1},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos1 = [
         {cpse_util:uuid(), <<"foo">>, [Rev1]},
@@ -414,27 +414,27 @@ cpse_purge_repeated_revisions(DbName) ->
     ?assertEqual([Rev1], PRevs1),
     ?assertEqual([Rev2], PRevs2),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 0},
         {del_doc_count, 0},
         {update_seq, 3},
         {changes, 0},
         {purge_seq, 2},
         {purge_infos, PurgeInfos1}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_purge_repeated_uuid(DbName) ->
     {ok, Rev} = cpse_util:save_doc(DbName, {[{'_id', foo1}, {vsn, 1.1}]}),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 1},
         {del_doc_count, 0},
         {update_seq, 1},
         {changes, 1},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos = [
         {cpse_util:uuid(), <<"foo1">>, [Rev]}
@@ -449,14 +449,14 @@ cpse_purge_repeated_uuid(DbName) ->
     % Although we can replicate it in
     {ok, []} = cpse_util:purge(DbName, PurgeInfos, [replicated_changes]),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 0},
         {del_doc_count, 0},
         {update_seq, 2},
         {changes, 0},
         {purge_seq, 1},
         {purge_infos, PurgeInfos}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 fold_all_infos(Info, Acc) ->
diff --git a/src/couch_pse_tests/src/cpse_test_purge_seqs.erl b/src/couch_pse_tests/src/cpse_test_purge_seqs.erl
index b7b49b3..c061747 100644
--- a/src/couch_pse_tests/src/cpse_test_purge_seqs.erl
+++ b/src/couch_pse_tests/src/cpse_test_purge_seqs.erl
@@ -29,13 +29,13 @@ cpse_increment_purge_seq_on_complete_purge(DbName) ->
     {ok, Rev1} = cpse_util:save_doc(DbName, {[{'_id', foo1}, {vsn, 1.1}]}),
     {ok, Rev2} = cpse_util:save_doc(DbName, {[{'_id', foo2}, {vsn, 1.2}]}),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 2},
         {del_doc_count, 0},
         {update_seq, 2},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos1 = [
         {cpse_util:uuid(), <<"foo1">>, [Rev1]}
@@ -43,13 +43,13 @@ cpse_increment_purge_seq_on_complete_purge(DbName) ->
     {ok, [{ok, PRevs1}]} = cpse_util:purge(DbName, PurgeInfos1),
     ?assertEqual([Rev1], PRevs1),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 1},
         {del_doc_count, 0},
         {update_seq, 3},
         {purge_seq, 1},
         {purge_infos, PurgeInfos1}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos2 = [
         {cpse_util:uuid(), <<"foo2">>, [Rev2]}
@@ -57,26 +57,26 @@ cpse_increment_purge_seq_on_complete_purge(DbName) ->
     {ok, [{ok, PRevs2}]} = cpse_util:purge(DbName, PurgeInfos2),
     ?assertEqual([Rev2], PRevs2),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 0},
         {del_doc_count, 0},
         {update_seq, 4},
         {purge_seq, 2},
         {purge_infos, PurgeInfos1 ++ PurgeInfos2}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_increment_purge_multiple_times(DbName) ->
     {ok, Rev1} = cpse_util:save_doc(DbName, {[{'_id', foo1}, {vsn, 1.1}]}),
     {ok, Rev2} = cpse_util:save_doc(DbName, {[{'_id', foo2}, {vsn, 1.2}]}),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 2},
         {del_doc_count, 0},
         {update_seq, 2},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos1 = [
         {cpse_util:uuid(), <<"foo1">>, [Rev1]},
@@ -86,13 +86,13 @@ cpse_increment_purge_multiple_times(DbName) ->
     ?assertEqual([Rev1], PRevs1),
     ?assertEqual([Rev2], PRevs2),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 0},
         {del_doc_count, 0},
         {update_seq, 3},
         {purge_seq, 2},
         {purge_infos, PurgeInfos1}
-    ], ?MODULE, ?LINE).
+    ]).
 
 
 cpse_increment_purge_seq_on_partial_purge(DbName) ->
@@ -101,13 +101,13 @@ cpse_increment_purge_seq_on_partial_purge(DbName) ->
     {ok, Rev1} = cpse_util:save_doc(DbName, Doc1),
     {ok, Rev2} = cpse_util:save_doc(DbName, Doc2, [replicated_changes]),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 1},
         {del_doc_count, 0},
         {update_seq, 2},
         {purge_seq, 0},
         {purge_infos, []}
-    ], ?MODULE, ?LINE),
+    ]),
 
     PurgeInfos1 = [
         {cpse_util:uuid(), <<"foo1">>, [Rev1]}
@@ -115,10 +115,10 @@ cpse_increment_purge_seq_on_partial_purge(DbName) ->
     {ok, [{ok, PRevs1}]} = cpse_util:purge(DbName, PurgeInfos1),
     ?assertEqual([Rev1], PRevs1),
 
-    cpse_util:assert_db_props(DbName, [
+    cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [
         {doc_count, 1},
         {del_doc_count, 0},
         {update_seq, 3},
         {purge_seq, 1},
         {purge_infos, PurgeInfos1}
-    ], ?MODULE, ?LINE).
+    ]).
diff --git a/src/couch_pse_tests/src/cpse_util.erl b/src/couch_pse_tests/src/cpse_util.erl
index 955a974..9dae9a0 100644
--- a/src/couch_pse_tests/src/cpse_util.erl
+++ b/src/couch_pse_tests/src/cpse_util.erl
@@ -182,10 +182,10 @@ uuid() ->
     couch_uuids:random().
 
 
-assert_db_props(DbName, Props, Module, Line) when is_binary(DbName) ->
+assert_db_props(Module, Line, DbName, Props) when is_binary(DbName) ->
     {ok, Db} = couch_db:open_int(DbName, []),
     try
-        assert_db_props(Db, Props, Module, Line)
+        assert_db_props(Module, Line, Db, Props)
     catch error:{assertEqual, Props} ->
         {_, Rest} = proplists:split(Props, [module, line]),
         erlang:error({assertEqual, [{module, Module}, {line, Line} | Rest]})
@@ -193,7 +193,7 @@ assert_db_props(DbName, Props, Module, Line) when is_binary(DbName) ->
         couch_db:close(Db)
     end;
 
-assert_db_props(Db, Props, Module, Line) ->
+assert_db_props(Module, Line, Db, Props) ->
     try
         assert_each_prop(Db, Props)
     catch error:{assertEqual, Props} ->


[couchdb] 06/06: [10/10] Clustered Purge: Clustered HTTP API

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a6c269e76e8633e0d7c48d6754eb5c888f1322a9
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Apr 24 12:27:58 2018 -0500

    [10/10] Clustered Purge: Clustered HTTP API
    
    The HTTP API for clustered purge is fairly straightforward. It is
    designed to match the general shape of the single node API. The only
    major caveat here is that the purge sequence is now hardcoded as null
    since the purge sequence would now otherwise be an opaque blob similar
    to the update_seq blobs.
    
    Its important to note that there is as yet no API invented for
    traversing the history of purge requests in any shape or form as that
    would mostly invalidate the entire purpose of using purge to remove any
    trace of a document from a database at the HTTP level. Although there
    will still be traces in individual shard files until all database
    components have processed the purge and compaction has run (while
    allowing for up to purge_infos_limit requests to remain available in
    perpetuity).
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <ma...@ca.ibm.com>
    Co-authored-by: jiangphcn <ji...@cn.ibm.com>
---
 src/chttpd/src/chttpd_db.erl           |  58 ++++--
 src/chttpd/test/chttpd_purge_tests.erl | 320 +++++++++++++++++++++++++++++++++
 test/javascript/tests/erlang_views.js  |   5 +-
 test/javascript/tests/purge.js         |  27 +--
 4 files changed, 378 insertions(+), 32 deletions(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index ed0adea..8bd4b7b 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -496,24 +496,33 @@ db_req(#httpd{path_parts=[_, <<"_bulk_get">>]}=Req, _Db) ->
 
 
 db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
+    couch_stats:increment_counter([couchdb, httpd, purge_requests]),
     chttpd:validate_ctype(Req, "application/json"),
+    W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
+    Options = [{user_ctx, Req#httpd.user_ctx}, {w, W}],
     {IdsRevs} = chttpd:json_body_obj(Req),
     IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
-    case fabric:purge_docs(Db, IdsRevs2) of
-    {ok, PurgeSeq, PurgedIdsRevs} ->
-        PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs}
-            <- PurgedIdsRevs],
-        send_json(Req, 200, {[
-            {<<"purge_seq">>, PurgeSeq},
-            {<<"purged">>, {PurgedIdsRevs2}}
-        ]});
-    Error ->
-        throw(Error)
-    end;
+    MaxIds = config:get_integer("purge", "max_document_id_number", 100),
+    case length(IdsRevs2) =< MaxIds of
+        false -> throw({bad_request, "Exceeded maximum number of documents."});
+        true -> ok
+    end,
+    RevsLen = lists:foldl(fun({_Id, Revs}, Acc) ->
+        length(Revs) + Acc
+    end, 0, IdsRevs2),
+    MaxRevs = config:get_integer("purge", "max_revisions_number", 1000),
+    case RevsLen =< MaxRevs of
+        false -> throw({bad_request, "Exceeded maximum number of revisions."});
+        true -> ok
+    end,
+    {ok, Results} = fabric:purge_docs(Db, IdsRevs2, Options),
+    {Code, Json} = purge_results_to_json(IdsRevs2, Results),
+    send_json(Req, Code, {[{<<"purge_seq">>, null}, {<<"purged">>, {Json}}]});
 
 db_req(#httpd{path_parts=[_,<<"_purge">>]}=Req, _Db) ->
     send_method_not_allowed(Req, "POST");
 
+
 db_req(#httpd{method='GET',path_parts=[_,OP]}=Req, Db) when ?IS_ALL_DOCS(OP) ->
     case chttpd:qs_json_value(Req, "keys", nil) of
     Keys when is_list(Keys) ->
@@ -623,6 +632,19 @@ db_req(#httpd{method='GET',path_parts=[_,<<"_revs_limit">>]}=Req, Db) ->
 db_req(#httpd{path_parts=[_,<<"_revs_limit">>]}=Req, _Db) ->
     send_method_not_allowed(Req, "PUT,GET");
 
+db_req(#httpd{method='PUT',path_parts=[_,<<"_purged_infos_limit">>]}=Req, Db) ->
+    Options = [{user_ctx, Req#httpd.user_ctx}],
+    case chttpd:json_body(Req) of
+        Limit when is_integer(Limit), Limit > 0 ->
+            ok = fabric:set_purge_infos_limit(Db, Limit, Options),
+            send_json(Req, {[{<<"ok">>, true}]});
+        _->
+            throw({bad_request, "`purge_infos_limit` must be positive integer"})
+    end;
+
+db_req(#httpd{method='GET',path_parts=[_,<<"_purged_infos_limit">>]}=Req, Db) ->
+    send_json(Req, fabric:get_purge_infos_limit(Db));
+
 % Special case to enable using an unencoded slash in the URL of design docs,
 % as slashes in document IDs must otherwise be URL encoded.
 db_req(#httpd{method='GET', mochi_req=MochiReq, path_parts=[_DbName, <<"_design/", _/binary>> | _]}=Req, _Db) ->
@@ -993,6 +1015,20 @@ update_doc_result_to_json(DocId, Error) ->
     {_Code, ErrorStr, Reason} = chttpd:error_info(Error),
     {[{id, DocId}, {error, ErrorStr}, {reason, Reason}]}.
 
+purge_results_to_json([], []) ->
+    {201, []};
+purge_results_to_json([{DocId, _Revs} | RIn], [{ok, PRevs} | ROut]) ->
+    {Code, Results} = purge_results_to_json(RIn, ROut),
+    {Code, [{DocId, couch_doc:revs_to_strs(PRevs)} | Results]};
+purge_results_to_json([{DocId, _Revs} | RIn], [{accepted, PRevs} | ROut]) ->
+    {Code, Results} = purge_results_to_json(RIn, ROut),
+    NewResults = [{DocId, couch_doc:revs_to_strs(PRevs)} | Results],
+    {erlang:max(Code, 202), NewResults};
+purge_results_to_json([{DocId, _Revs} | RIn], [Error | ROut]) ->
+    {Code, Results} = purge_results_to_json(RIn, ROut),
+    {NewCode, ErrorStr, Reason} = chttpd:error_info(Error),
+    NewResults = [{DocId, {[{error, ErrorStr}, {reason, Reason}]}} | Results],
+    {erlang:max(NewCode, Code), NewResults}.
 
 send_updated_doc(Req, Db, DocId, Json) ->
     send_updated_doc(Req, Db, DocId, Json, []).
diff --git a/src/chttpd/test/chttpd_purge_tests.erl b/src/chttpd/test/chttpd_purge_tests.erl
new file mode 100644
index 0000000..6865525
--- /dev/null
+++ b/src/chttpd/test/chttpd_purge_tests.erl
@@ -0,0 +1,320 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(chttpd_purge_tests).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(USER, "chttpd_db_test_admin").
+-define(PASS, "pass").
+-define(AUTH, {basic_auth, {?USER, ?PASS}}).
+-define(CONTENT_JSON, {"Content-Type", "application/json"}).
+
+
+setup() ->
+    ok = config:set("admins", ?USER, ?PASS, _Persist=false),
+    TmpDb = ?tempdb(),
+    Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
+    Port = mochiweb_socket_server:get(chttpd, port),
+    Url = lists:concat(["http://", Addr, ":", Port, "/", ?b2l(TmpDb)]),
+    create_db(Url),
+    Url.
+
+
+teardown(Url) ->
+    delete_db(Url),
+    ok = config:delete("admins", ?USER, _Persist=false).
+
+
+create_db(Url) ->
+    {ok, Status, _, _} = test_request:put(Url, [?CONTENT_JSON, ?AUTH], "{}"),
+    ?assert(Status =:= 201 orelse Status =:= 202).
+
+
+create_doc(Url, Id) ->
+    test_request:put(Url ++ "/" ++ Id,
+        [?CONTENT_JSON, ?AUTH], "{\"mr\": \"rockoartischocko\"}").
+
+create_doc(Url, Id, Content) ->
+    test_request:put(Url ++ "/" ++ Id,
+        [?CONTENT_JSON, ?AUTH], "{\"mr\": \"" ++ Content ++ "\"}").
+
+
+delete_db(Url) ->
+    {ok, 200, _, _} = test_request:delete(Url, [?AUTH]).
+
+
+purge_test_() ->
+    {
+        "chttpd db tests",
+        {
+            setup,
+            fun chttpd_test_util:start_couch/0,
+            fun chttpd_test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0,
+                fun teardown/1,
+                [
+                    fun test_empty_purge_request/1,
+                    fun test_ok_purge_request/1,
+                    fun test_partial_purge_request/1,
+                    fun test_mixed_purge_request/1,
+                    fun test_overmany_ids_or_revs_purge_request/1,
+                    fun test_exceed_limits_on_purge_infos/1,
+                    fun should_error_set_purged_docs_limit_to0/1
+                ]
+            }
+        }
+    }.
+
+
+test_empty_purge_request(Url) ->
+    ?_test(begin
+        IdsRevs = "{}",
+        {ok, Status, _, ResultBody} = test_request:post(Url ++ "/_purge/",
+            [?CONTENT_JSON, ?AUTH], IdsRevs),
+        ResultJson = ?JSON_DECODE(ResultBody),
+        ?assert(Status =:= 201 orelse Status =:= 202),
+        ?assertEqual(
+                {[
+                    {<<"purge_seq">>, null},
+                    {<<"purged">>,{[]}}
+                ]},
+                ResultJson
+            )
+    end).
+
+
+test_ok_purge_request(Url) ->
+    ?_test(begin
+        {ok, _, _, Body} = create_doc(Url, "doc1"),
+        {Json} = ?JSON_DECODE(Body),
+        Rev1 = couch_util:get_value(<<"rev">>, Json, undefined),
+        {ok, _, _, Body2} = create_doc(Url, "doc2"),
+        {Json2} = ?JSON_DECODE(Body2),
+        Rev2 = couch_util:get_value(<<"rev">>, Json2, undefined),
+        {ok, _, _, Body3} = create_doc(Url, "doc3"),
+        {Json3} = ?JSON_DECODE(Body3),
+        Rev3 = couch_util:get_value(<<"rev">>, Json3, undefined),
+
+        IdsRevsEJson = {[
+            {<<"doc1">>, [Rev1]},
+            {<<"doc2">>, [Rev2]},
+            {<<"doc3">>, [Rev3]}
+        ]},
+        IdsRevs = binary_to_list(?JSON_ENCODE(IdsRevsEJson)),
+
+        {ok, Status, _, ResultBody} = test_request:post(Url ++ "/_purge/",
+            [?CONTENT_JSON, ?AUTH], IdsRevs),
+        ResultJson = ?JSON_DECODE(ResultBody),
+        ?assert(Status =:= 201 orelse Status =:= 202),
+        ?assertEqual(
+                {[
+                    {<<"purge_seq">>, null},
+                    {<<"purged">>, {[
+                        {<<"doc1">>, [Rev1]},
+                        {<<"doc2">>, [Rev2]},
+                        {<<"doc3">>, [Rev3]}
+                    ]}}
+                ]},
+                ResultJson
+            )
+    end).
+
+
+test_partial_purge_request(Url) ->
+    ?_test(begin
+        {ok, _, _, Body} = create_doc(Url, "doc1"),
+        {Json} = ?JSON_DECODE(Body),
+        Rev1 = couch_util:get_value(<<"rev">>, Json, undefined),
+
+        NewDoc = "{\"new_edits\": false, \"docs\": [{\"_id\": \"doc1\",
+            \"_revisions\": {\"start\": 1, \"ids\": [\"12345\", \"67890\"]},
+            \"content\": \"updated\", \"_rev\": \"" ++ ?b2l(Rev1) ++ "\"}]}",
+        {ok, _, _, _} = test_request:post(Url ++ "/_bulk_docs/",
+            [?CONTENT_JSON, ?AUTH], NewDoc),
+
+        IdsRevsEJson = {[{<<"doc1">>, [Rev1]}]},
+        IdsRevs = binary_to_list(?JSON_ENCODE(IdsRevsEJson)),
+        {ok, Status, _, ResultBody} = test_request:post(Url ++ "/_purge/",
+            [?CONTENT_JSON, ?AUTH], IdsRevs),
+        ResultJson = ?JSON_DECODE(ResultBody),
+        ?assert(Status =:= 201 orelse Status =:= 202),
+        ?assertEqual(
+            {[
+                {<<"purge_seq">>, null},
+                {<<"purged">>, {[
+                    {<<"doc1">>, [Rev1]}
+                ]}}
+            ]},
+            ResultJson
+        ),
+        {ok, Status2, _, ResultBody2} = test_request:get(Url
+            ++ "/doc1/", [?AUTH]),
+        {Json2} = ?JSON_DECODE(ResultBody2),
+        Content = couch_util:get_value(<<"content">>, Json2, undefined),
+        ?assertEqual(<<"updated">>, Content),
+        ?assert(Status2 =:= 200)
+    end).
+
+
+test_mixed_purge_request(Url) ->
+    ?_test(begin
+        {ok, _, _, Body} = create_doc(Url, "doc1"),
+        {Json} = ?JSON_DECODE(Body),
+        Rev1 = couch_util:get_value(<<"rev">>, Json, undefined),
+
+        NewDoc = "{\"new_edits\": false, \"docs\": [{\"_id\": \"doc1\",
+            \"_revisions\": {\"start\": 1, \"ids\": [\"12345\", \"67890\"]},
+            \"content\": \"updated\", \"_rev\": \"" ++ ?b2l(Rev1) ++ "\"}]}",
+        {ok, _, _, _} = test_request:post(Url ++ "/_bulk_docs/",
+            [?CONTENT_JSON, ?AUTH], NewDoc),
+
+        {ok, _, _, _Body2} = create_doc(Url, "doc2", "content2"),
+        {ok, _, _, Body3} = create_doc(Url, "doc3", "content3"),
+        {Json3} = ?JSON_DECODE(Body3),
+        Rev3 = couch_util:get_value(<<"rev">>, Json3, undefined),
+
+
+        IdsRevsEJson = {[
+            {<<"doc1">>, [Rev1]},  % partial purge
+            {<<"doc2">>, [Rev3, Rev1]},  % correct format, but invalid rev
+            {<<"doc3">>, [Rev3]}   % correct format and rev
+        ]},
+        IdsRevs = binary_to_list(?JSON_ENCODE(IdsRevsEJson)),
+        {ok, Status, _, Body4} = test_request:post(Url ++ "/_purge/",
+            [?CONTENT_JSON, ?AUTH], IdsRevs),
+        ResultJson = ?JSON_DECODE(Body4),
+        ?assert(Status =:= 201 orelse Status =:= 202),
+        ?assertEqual(
+            {[
+                {<<"purge_seq">>, null},
+                {<<"purged">>, {[
+                    {<<"doc1">>, [Rev1]},
+                    {<<"doc2">>, []},
+                    {<<"doc3">>, [Rev3]}
+                ]}}
+            ]},
+            ResultJson
+        ),
+        {ok, Status2, _, Body5} = test_request:get(Url
+            ++ "/doc1/", [?AUTH]),
+        {Json5} = ?JSON_DECODE(Body5),
+        Content = couch_util:get_value(<<"content">>, Json5, undefined),
+        ?assertEqual(<<"updated">>, Content),
+        ?assert(Status2 =:= 200)
+    end).
+
+
+test_overmany_ids_or_revs_purge_request(Url) ->
+    ?_test(begin
+        {ok, _, _, Body} = create_doc(Url, "doc1"),
+        {Json} = ?JSON_DECODE(Body),
+        Rev1 = couch_util:get_value(<<"rev">>, Json, undefined),
+
+        NewDoc = "{\"new_edits\": false, \"docs\": [{\"_id\": \"doc1\",
+            \"_revisions\": {\"start\": 1, \"ids\": [\"12345\", \"67890\"]},
+            \"content\": \"updated\", \"_rev\": \"" ++ ?b2l(Rev1) ++ "\"}]}",
+        {ok, _, _, _} = test_request:post(Url ++ "/_bulk_docs/",
+            [?CONTENT_JSON, ?AUTH], NewDoc),
+
+        {ok, _, _, _Body2} = create_doc(Url, "doc2", "content2"),
+        {ok, _, _, Body3} = create_doc(Url, "doc3", "content3"),
+        {Json3} = ?JSON_DECODE(Body3),
+        Rev3 = couch_util:get_value(<<"rev">>, Json3, undefined),
+
+        IdsRevsEJson = {[
+            {<<"doc1">>, [Rev1]},  % partial purge
+            {<<"doc2">>, [Rev3, Rev1]},  % correct format, but invalid rev
+            {<<"doc3">>, [Rev3]}   % correct format and rev
+        ]},
+        IdsRevs = binary_to_list(?JSON_ENCODE(IdsRevsEJson)),
+
+        % Ids larger than expected
+        config:set("purge", "max_document_id_number", "1"),
+        {ok, Status, _, Body4} = test_request:post(Url ++ "/_purge/",
+            [?CONTENT_JSON, ?AUTH], IdsRevs),
+        config:delete("purge", "max_document_id_number"),
+        ResultJson = ?JSON_DECODE(Body4),
+        ?assertEqual(400, Status),
+        ?assertMatch({[
+            {<<"error">>,<<"bad_request">>},
+            {<<"reason">>,<<"Exceeded maximum number of documents.">>}]},
+            ResultJson),
+
+        % Revs larger than expected
+        config:set("purge", "max_revisions_number", "1"),
+        {ok, Status2, _, Body5} = test_request:post(Url ++ "/_purge/",
+            [?CONTENT_JSON, ?AUTH], IdsRevs),
+        config:delete("purge", "max_revisions_number"),
+        ResultJson2 = ?JSON_DECODE(Body5),
+        ?assertEqual(400, Status2),
+        ?assertMatch({[
+            {<<"error">>,<<"bad_request">>},
+            {<<"reason">>,<<"Exceeded maximum number of revisions.">>}]},
+            ResultJson2)
+    end).
+
+
+test_exceed_limits_on_purge_infos(Url) ->
+    ?_test(begin
+        {ok, Status1, _, _} = test_request:put(Url ++ "/_purged_infos_limit/",
+            [?CONTENT_JSON, ?AUTH], "2"),
+        ?assert(Status1 =:= 200),
+
+        {ok, _, _, Body} = create_doc(Url, "doc1"),
+        {Json} = ?JSON_DECODE(Body),
+        Rev1 = couch_util:get_value(<<"rev">>, Json, undefined),
+        {ok, _, _, Body2} = create_doc(Url, "doc2"),
+        {Json2} = ?JSON_DECODE(Body2),
+        Rev2 = couch_util:get_value(<<"rev">>, Json2, undefined),
+        {ok, _, _, Body3} = create_doc(Url, "doc3"),
+        {Json3} = ?JSON_DECODE(Body3),
+        Rev3 = couch_util:get_value(<<"rev">>, Json3, undefined),
+
+        IdsRevsEJson = {[
+            {<<"doc1">>, [Rev1]},
+            {<<"doc2">>, [Rev2]},
+            {<<"doc3">>, [Rev3]}
+        ]},
+        IdsRevs = binary_to_list(?JSON_ENCODE(IdsRevsEJson)),
+
+        {ok, Status2, _, ResultBody} = test_request:post(Url ++ "/_purge/",
+            [?CONTENT_JSON, ?AUTH], IdsRevs),
+
+        ResultJson = ?JSON_DECODE(ResultBody),
+        ?assert(Status2 =:= 201 orelse Status2 =:= 202),
+        ?assertEqual(
+            {[
+                {<<"purge_seq">>, null},
+                {<<"purged">>, {[
+                    {<<"doc1">>, [Rev1]},
+                    {<<"doc2">>, [Rev2]},
+                    {<<"doc3">>, [Rev3]}
+                ]}}
+            ]},
+            ResultJson
+        )
+
+    end).
+
+
+should_error_set_purged_docs_limit_to0(Url) ->
+    ?_test(begin
+        {ok, Status, _, _} = test_request:put(Url ++ "/_purged_infos_limit/",
+            [?CONTENT_JSON, ?AUTH], "0"),
+        ?assert(Status =:= 400)
+    end).
\ No newline at end of file
diff --git a/test/javascript/tests/erlang_views.js b/test/javascript/tests/erlang_views.js
index ec78e65..9b15e10 100644
--- a/test/javascript/tests/erlang_views.js
+++ b/test/javascript/tests/erlang_views.js
@@ -56,7 +56,7 @@ couchTests.erlang_views = function(debug) {
             '  {Info} = couch_util:get_value(<<"info">>, Req, {[]}), ' +
             '  Purged = couch_util:get_value(<<"purge_seq">>, Info, -1), ' +
             '  Verb = couch_util:get_value(<<"method">>, Req, <<"not_get">>), ' +
-            '  R = list_to_binary(io_lib:format("~b - ~s", [Purged, Verb])), ' +
+            '  R = list_to_binary(io_lib:format("~s - ~s", [Purged, Verb])), ' +
             '  {[{<<"code">>, 200}, {<<"headers">>, {[]}}, {<<"body">>, R}]} ' +
             'end.'
         },
@@ -85,7 +85,8 @@ couchTests.erlang_views = function(debug) {
       var url = "/" + db_name + "/_design/erlview/_show/simple/1";
       var xhr = CouchDB.request("GET", url);
       T(xhr.status == 200, "standard get should be 200");
-      T(xhr.responseText == "0 - GET");
+      T(/0-/.test(xhr.responseText));
+      T(/- GET/.test(xhr.responseText));
 
       var url = "/" + db_name + "/_design/erlview/_list/simple_list/simple_view";
       var xhr = CouchDB.request("GET", url);
diff --git a/test/javascript/tests/purge.js b/test/javascript/tests/purge.js
index 38eca8d..0c11d9a 100644
--- a/test/javascript/tests/purge.js
+++ b/test/javascript/tests/purge.js
@@ -11,7 +11,6 @@
 // the License.
 
 couchTests.purge = function(debug) {
-  return console.log('TODO: this feature is not yet implemented');
   var db_name = get_random_db_name();
   var db = new CouchDB(db_name, {"X-Couch-Full-Commit":"false"});
   db.createDb();
@@ -53,21 +52,13 @@ couchTests.purge = function(debug) {
   var xhr = CouchDB.request("POST", "/" + db_name + "/_purge", {
     body: JSON.stringify({"1":[doc1._rev], "2":[doc2._rev]})
   });
-  console.log(xhr.status);
-  console.log(xhr.responseText);
-  T(xhr.status == 200);
+  T(xhr.status == 201);
 
   var result = JSON.parse(xhr.responseText);
   var newInfo = db.info();
-  
-  // purging increments the update sequence
-  T(info.update_seq+1 == newInfo.update_seq);
-  // and it increments the purge_seq
-  T(info.purge_seq+1 == newInfo.purge_seq);
-  T(result.purge_seq == newInfo.purge_seq);
 
-  T(result.purged["1"][0] == doc1._rev);
-  T(result.purged["2"][0] == doc2._rev);
+  T(result.purged["1"] == doc1._rev);
+  T(result.purged["2"] == doc2._rev);
 
   T(db.open("1") == null);
   T(db.open("2") == null);
@@ -85,7 +76,6 @@ couchTests.purge = function(debug) {
   // compaction isn't instantaneous, loop until done
   while (db.info().compact_running) {};
   var compactInfo = db.info();
-  T(compactInfo.purge_seq == newInfo.purge_seq);
 
   // purge documents twice in a row without loading views
   // (causes full view rebuilds)
@@ -97,15 +87,14 @@ couchTests.purge = function(debug) {
     body: JSON.stringify({"3":[doc3._rev]})
   });
 
-  T(xhr.status == 200);
+  T(xhr.status == 201);
 
   xhr = CouchDB.request("POST", "/" + db_name + "/_purge", {
     body: JSON.stringify({"4":[doc4._rev]})
   });
 
-  T(xhr.status == 200);
+  T(xhr.status == 201);
   result = JSON.parse(xhr.responseText);
-  T(result.purge_seq == db.info().purge_seq);
 
   var rows = db.view("test/all_docs_twice").rows;
   for (var i = 4; i < numDocs; i++) {
@@ -129,7 +118,7 @@ couchTests.purge = function(debug) {
   var xhr = CouchDB.request("POST", "/" + dbB.name + "/_purge", {
     body: JSON.stringify({"test":[docA._rev]})
   });
-  TEquals(200, xhr.status, "single rev purge after replication succeeds");
+  TEquals(201, xhr.status, "single rev purge after replication succeeds");
 
   var xhr = CouchDB.request("GET", "/" + dbB.name + "/test?rev=" + docA._rev);
   TEquals(404, xhr.status, "single rev purge removes revision");
@@ -137,14 +126,14 @@ couchTests.purge = function(debug) {
   var xhr = CouchDB.request("POST", "/" + dbB.name + "/_purge", {
     body: JSON.stringify({"test":[docB._rev]})
   });
-  TEquals(200, xhr.status, "single rev purge after replication succeeds");
+  TEquals(201, xhr.status, "single rev purge after replication succeeds");
   var xhr = CouchDB.request("GET", "/" + dbB.name + "/test?rev=" + docB._rev);
   TEquals(404, xhr.status, "single rev purge removes revision");
 
   var xhr = CouchDB.request("POST", "/" + dbB.name + "/_purge", {
     body: JSON.stringify({"test":[docA._rev, docB._rev]})
   });
-  TEquals(200, xhr.status, "all rev purge after replication succeeds");
+  TEquals(201, xhr.status, "all rev purge after replication succeeds");
 
   // cleanup
   db.deleteDb();


[couchdb] 02/06: [06/10] Clustered Purge: Update mrview indexes

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 0cc7757c789ea7259e6dd8f2c9ef19750a5f8898
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Apr 24 12:26:42 2018 -0500

    [06/10] Clustered Purge: Update mrview indexes
    
    This commit updates the mrview secondary index to properly process the
    new history of purge requests as well as to store the _local purge
    checkpoint doc.
    
    The importance of the _local checkpoint doc is to ensure that compaction
    of a database does not remove any purge requests that have not yet been
    processed by this secondary index.
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <ma...@ca.ibm.com>
    Co-authored-by: jiangphcn <ji...@cn.ibm.com>
---
 src/couch_index/src/couch_index_epi.erl            |   5 +-
 ...dex_epi.erl => couch_index_plugin_couch_db.erl} |  39 +-
 src/couch_index/src/couch_index_updater.erl        |  50 +-
 src/couch_mrview/src/couch_mrview_cleanup.erl      |  16 +-
 src/couch_mrview/src/couch_mrview_index.erl        | 115 ++++-
 src/couch_mrview/src/couch_mrview_test_util.erl    |   5 +
 src/couch_mrview/src/couch_mrview_updater.erl      |  14 +-
 src/couch_mrview/src/couch_mrview_util.erl         |  39 +-
 .../test/couch_mrview_purge_docs_fabric_tests.erl  | 276 +++++++++++
 .../test/couch_mrview_purge_docs_tests.erl         | 506 +++++++++++++++++++++
 10 files changed, 1004 insertions(+), 61 deletions(-)

diff --git a/src/couch_index/src/couch_index_epi.erl b/src/couch_index/src/couch_index_epi.erl
index 946a590..1c4eb95 100644
--- a/src/couch_index/src/couch_index_epi.erl
+++ b/src/couch_index/src/couch_index_epi.erl
@@ -28,8 +28,9 @@ app() ->
     couch_index.
 
 providers() ->
-    [].
-
+    [
+        {couch_db, couch_index_plugin_couch_db}
+    ].
 
 services() ->
     [
diff --git a/src/couch_index/src/couch_index_epi.erl b/src/couch_index/src/couch_index_plugin_couch_db.erl
similarity index 52%
copy from src/couch_index/src/couch_index_epi.erl
copy to src/couch_index/src/couch_index_plugin_couch_db.erl
index 946a590..0af22e3 100644
--- a/src/couch_index/src/couch_index_epi.erl
+++ b/src/couch_index/src/couch_index_plugin_couch_db.erl
@@ -2,7 +2,7 @@
 % use this file except in compliance with the License. You may obtain a copy of
 % the License at
 %
-% http://www.apache.org/licenses/LICENSE-2.0
+%   http://www.apache.org/licenses/LICENSE-2.0
 %
 % Unless required by applicable law or agreed to in writing, software
 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
@@ -10,40 +10,17 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
--module(couch_index_epi).
-
--behaviour(couch_epi_plugin).
+-module(couch_index_plugin_couch_db).
 
 -export([
-    app/0,
-    providers/0,
-    services/0,
-    data_subscriptions/0,
-    data_providers/0,
-    processes/0,
-    notify/3
+    is_valid_purge_client/2,
+    on_compact/2
 ]).
 
-app() ->
-    couch_index.
-
-providers() ->
-    [].
-
-
-services() ->
-    [
-        {couch_index, couch_index_plugin}
-    ].
-
-data_subscriptions() ->
-    [].
 
-data_providers() ->
-    [].
+is_valid_purge_client(DbName, Props) ->
+    couch_mrview_index:verify_index_exists(DbName, Props).
 
-processes() ->
-    [].
 
-notify(_Key, _Old, _New) ->
-    ok.
+on_compact(DbName, DDocs) ->
+    couch_mrview_index:ensure_local_purge_docs(DbName, DDocs).
diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl
index 5ab9ea8..7864bde 100644
--- a/src/couch_index/src/couch_index_updater.erl
+++ b/src/couch_index/src/couch_index_updater.erl
@@ -141,12 +141,10 @@ update(Idx, Mod, IdxState) ->
         DbUpdateSeq = couch_db:get_update_seq(Db),
         DbCommittedSeq = couch_db:get_committed_update_seq(Db),
 
-        PurgedIdxState = case purge_index(Db, Mod, IdxState) of
-            {ok, IdxState0} -> IdxState0;
-            reset -> exit({reset, self()})
-        end,
-
-        NumChanges = couch_db:count_changes_since(Db, CurrSeq),
+        NumUpdateChanges = couch_db:count_changes_since(Db, CurrSeq),
+        NumPurgeChanges = count_pending_purged_docs_since(Db, Mod, IdxState),
+        TotalChanges = NumUpdateChanges + NumPurgeChanges,
+        {ok, PurgedIdxState} = purge_index(Db, Mod, IdxState),
 
         GetSeq = fun
             (#full_doc_info{update_seq=Seq}) -> Seq;
@@ -185,8 +183,13 @@ update(Idx, Mod, IdxState) ->
                     {ok, {NewSt, true}}
             end
         end,
+        {ok, InitIdxState} = Mod:start_update(
+                Idx,
+                PurgedIdxState,
+                TotalChanges,
+                NumPurgeChanges
+            ),
 
-        {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges),
         Acc0 = {InitIdxState, true},
         {ok, Acc} = couch_db:fold_changes(Db, CurrSeq, Proc, Acc0, []),
         {ProcIdxSt, SendLast} = Acc,
@@ -206,14 +209,29 @@ update(Idx, Mod, IdxState) ->
 
 
 purge_index(Db, Mod, IdxState) ->
-    {ok, DbPurgeSeq} = couch_db:get_purge_seq(Db),
+    DbPurgeSeq = couch_db:get_purge_seq(Db),
     IdxPurgeSeq = Mod:get(purge_seq, IdxState),
-    if
-        DbPurgeSeq == IdxPurgeSeq ->
-            {ok, IdxState};
-        DbPurgeSeq == IdxPurgeSeq + 1 ->
-            {ok, PurgedIdRevs} = couch_db:get_last_purged(Db),
-            Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState);
-        true ->
-            reset
+    if IdxPurgeSeq == DbPurgeSeq -> {ok, IdxState}; true ->
+        FoldFun = fun({PurgeSeq, _UUId, Id, Revs}, Acc) ->
+            Mod:purge(Db, PurgeSeq, [{Id, Revs}], Acc)
+        end,
+        {ok, NewStateAcc} = try
+            couch_db:fold_purge_infos(
+                    Db,
+                    IdxPurgeSeq,
+                    FoldFun,
+                    IdxState,
+                    []
+                )
+        catch error:{invalid_start_purge_seq, _} ->
+            exit({reset, self()})
+        end,
+        Mod:update_local_purge_doc(Db, NewStateAcc),
+        {ok, NewStateAcc}
     end.
+
+
+count_pending_purged_docs_since(Db, Mod, IdxState) ->
+    DbPurgeSeq = couch_db:get_purge_seq(Db),
+    IdxPurgeSeq = Mod:get(purge_seq, IdxState),
+    DbPurgeSeq - IdxPurgeSeq.
diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl b/src/couch_mrview/src/couch_mrview_cleanup.erl
index 380376d..e0cb1c6 100644
--- a/src/couch_mrview/src/couch_mrview_cleanup.erl
+++ b/src/couch_mrview/src/couch_mrview_cleanup.erl
@@ -41,7 +41,19 @@ run(Db) ->
 
     lists:foreach(fun(FN) ->
         couch_log:debug("Deleting stale view file: ~s", [FN]),
-        couch_file:delete(RootDir, FN, [sync])
+        couch_file:delete(RootDir, FN, [sync]),
+        case couch_mrview_util:verify_view_filename(FN) of
+            true ->
+                Sig = couch_mrview_util:get_signature_from_filename(FN),
+                DocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+                case couch_db:open_doc(Db, DocId, []) of
+                    {ok, LocalPurgeDoc} ->
+                        couch_db:update_doc(Db,
+                            LocalPurgeDoc#doc{deleted=true}, [?ADMIN_CTX]);
+                    {not_found, _} ->
+                        ok
+                end;
+            false -> ok
+        end
     end, ToDelete),
-
     ok.
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index aa1ee27..12e0e1d 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -15,9 +15,11 @@
 
 -export([get/2]).
 -export([init/2, open/2, close/1, reset/1, delete/1, shutdown/1]).
--export([start_update/3, purge/4, process_doc/3, finish_update/1, commit/1]).
+-export([start_update/4, purge/4, process_doc/3, finish_update/1, commit/1]).
 -export([compact/3, swap_compacted/2, remove_compacted/1]).
 -export([index_file_exists/1]).
+-export([update_local_purge_doc/2, verify_index_exists/2]).
+-export([ensure_local_purge_docs/2]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -121,14 +123,17 @@ open(Db, State) ->
                 {ok, {OldSig, Header}} ->
                     % Matching view signatures.
                     NewSt = couch_mrview_util:init_state(Db, Fd, State, Header),
+                    ensure_local_purge_doc(Db, NewSt),
                     {ok, NewSt};
                 % end of upgrade code for <= 1.2.x
                 {ok, {Sig, Header}} ->
                     % Matching view signatures.
                     NewSt = couch_mrview_util:init_state(Db, Fd, State, Header),
+                    ensure_local_purge_doc(Db, NewSt),
                     {ok, NewSt};
                 _ ->
                     NewSt = couch_mrview_util:reset_index(Db, Fd, State),
+                    ensure_local_purge_doc(Db, NewSt),
                     {ok, NewSt}
             end;
         {error, Reason} = Error ->
@@ -167,8 +172,13 @@ reset(State) ->
     end).
 
 
-start_update(PartialDest, State, NumChanges) ->
-    couch_mrview_updater:start_update(PartialDest, State, NumChanges).
+start_update(PartialDest, State, NumChanges, NumChangesDone) ->
+    couch_mrview_updater:start_update(
+        PartialDest,
+        State,
+        NumChanges,
+        NumChangesDone
+    ).
 
 
 purge(Db, PurgeSeq, PurgedIdRevs, State) ->
@@ -207,3 +217,102 @@ index_file_exists(State) ->
     } = State,
     IndexFName = couch_mrview_util:index_file(DbName, Sig),
     filelib:is_file(IndexFName).
+
+
+verify_index_exists(DbName, Props) ->
+    try
+        Type = couch_util:get_value(<<"type">>, Props),
+        if Type =/= <<"mrview">> -> false; true ->
+            DDocId = couch_util:get_value(<<"ddoc_id">>, Props),
+            couch_util:with_db(DbName, fun(Db) ->
+                {ok, DesignDocs} = couch_db:get_design_docs(Db),
+                case get_ddoc(DbName, DesignDocs, DDocId) of
+                    #doc{} = DDoc ->
+                        {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(
+                            DbName, DDoc),
+                        IdxSig = IdxState#mrst.sig,
+                        SigInLocal = couch_util:get_value(
+                            <<"signature">>, Props),
+                        couch_index_util:hexsig(IdxSig) == SigInLocal;
+                    not_found ->
+                        false
+                end
+            end)
+        end
+    catch _:_ ->
+        false
+    end.
+
+
+get_ddoc(<<"shards/", _/binary>> = _DbName, DesignDocs, DDocId) ->
+    DDocs = [couch_doc:from_json_obj(DD) || DD <- DesignDocs],
+    case lists:keyfind(DDocId, #doc.id, DDocs) of
+        #doc{} = DDoc -> DDoc;
+        false -> not_found
+    end;
+get_ddoc(DbName, DesignDocs, DDocId) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        case lists:keyfind(DDocId, #full_doc_info.id, DesignDocs) of
+            #full_doc_info{} = DDocInfo ->
+                {ok, DDoc} = couch_db:open_doc_int(
+                    Db, DDocInfo, [ejson_body]),
+                    DDoc;
+            false ->
+                not_found
+        end
+    end).
+
+
+ensure_local_purge_docs(DbName, DDocs) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        lists:foreach(fun(DDoc) ->
+            try couch_mrview_util:ddoc_to_mrst(DbName, DDoc) of
+                {ok, MRSt} ->
+                    ensure_local_purge_doc(Db, MRSt)
+            catch _:_ ->
+                ok
+            end
+        end, DDocs)
+    end).
+
+
+ensure_local_purge_doc(Db, #mrst{}=State) ->
+    Sig = couch_index_util:hexsig(get(signature, State)),
+    DocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+    case couch_db:open_doc(Db, DocId, []) of
+        {not_found, _} ->
+            create_local_purge_doc(Db, State);
+        {ok, _} ->
+            ok
+    end.
+
+
+create_local_purge_doc(Db, State) ->
+    PurgeSeq = couch_db:get_purge_seq(Db),
+    update_local_purge_doc(Db, State, PurgeSeq).
+
+
+update_local_purge_doc(Db, State) ->
+    update_local_purge_doc(Db, State, get(purge_seq, State)).
+
+
+update_local_purge_doc(Db, State, PSeq) ->
+    Sig = couch_index_util:hexsig(State#mrst.sig),
+    DocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+    {Mega, Secs, _} = os:timestamp(),
+    NowSecs = Mega * 1000000 + Secs,
+    BaseDoc = couch_doc:from_json_obj({[
+        {<<"_id">>, DocId},
+        {<<"type">>, <<"mrview">>},
+        {<<"purge_seq">>, PSeq},
+        {<<"updated_on">>, NowSecs},
+        {<<"ddoc_id">>, get(idx_name, State)},
+        {<<"signature">>, Sig}
+    ]}),
+    Doc = case couch_db:open_doc(Db, DocId, []) of
+        {ok, #doc{revs = Revs}} ->
+            BaseDoc#doc{revs = Revs};
+        {not_found, _} ->
+            BaseDoc
+    end,
+    couch_db:update_doc(Db, Doc, []).
diff --git a/src/couch_mrview/src/couch_mrview_test_util.erl b/src/couch_mrview/src/couch_mrview_test_util.erl
index b07b076..35ab6c6 100644
--- a/src/couch_mrview/src/couch_mrview_test_util.erl
+++ b/src/couch_mrview/src/couch_mrview_test_util.erl
@@ -49,6 +49,11 @@ make_docs(local, Count) ->
 make_docs(_, Count) ->
     [doc(I) || I <- lists:seq(1, Count)].
 
+
+make_docs(_, Since, Count) ->
+    [doc(I) || I <- lists:seq(Since, Count)].
+        
+
 ddoc({changes, Opts}) ->
     ViewOpts = case Opts of
         seq_indexed ->
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 214f487..3383b49 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -12,15 +12,14 @@
 
 -module(couch_mrview_updater).
 
--export([start_update/3, purge/4, process_doc/3, finish_update/1]).
+-export([start_update/4, purge/4, process_doc/3, finish_update/1]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 -define(REM_VAL, removed).
 
-
-start_update(Partial, State, NumChanges) ->
+start_update(Partial, State, NumChanges, NumChangesDone) ->
     MaxSize = config:get_integer("view_updater", "queue_memory_cap", 100000),
     MaxItems = config:get_integer("view_updater", "queue_item_cap", 500),
     QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}],
@@ -36,14 +35,19 @@ start_update(Partial, State, NumChanges) ->
     },
 
     Self = self(),
+
     MapFun = fun() ->
+        Progress = case NumChanges of
+            0 -> 0;
+            _ -> (NumChangesDone * 100) div NumChanges
+        end,
         couch_task_status:add_task([
             {indexer_pid, ?l2b(pid_to_list(Partial))},
             {type, indexer},
             {database, State#mrst.db_name},
             {design_document, State#mrst.idx_name},
-            {progress, 0},
-            {changes_done, 0},
+            {progress, Progress},
+            {changes_done, NumChangesDone},
             {total_changes, NumChanges}
         ]),
         couch_task_status:set_update_frequency(500),
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 0c6e5fc..fe50662 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -13,6 +13,8 @@
 -module(couch_mrview_util).
 
 -export([get_view/4, get_view_index_pid/4]).
+-export([get_local_purge_doc_id/1, get_value_from_options/2]).
+-export([verify_view_filename/1, get_signature_from_filename/1]).
 -export([ddoc_to_mrst/2, init_state/4, reset_index/3]).
 -export([make_header/1]).
 -export([index_file/2, compaction_file/2, open_file/1]).
@@ -41,6 +43,39 @@
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 
+get_local_purge_doc_id(Sig) ->
+    ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Sig).
+
+
+get_value_from_options(Key, Options) ->
+    case couch_util:get_value(Key, Options) of
+        undefined ->
+            Reason = <<"'", Key/binary, "' must exists in options.">>,
+            throw({bad_request, Reason});
+        Value -> Value
+    end.
+
+
+verify_view_filename(FileName) ->
+    FilePathList = filename:split(FileName),
+    PureFN = lists:last(FilePathList),
+    case filename:extension(PureFN) of
+        ".view" ->
+            Sig = filename:basename(PureFN),
+            case [Ch || Ch <- Sig, not (((Ch >= $0) and (Ch =< $9))
+                orelse ((Ch >= $a) and (Ch =< $f))
+                orelse ((Ch >= $A) and (Ch =< $F)))] == [] of
+                true -> true;
+                false -> false
+            end;
+        _ -> false
+    end.
+
+get_signature_from_filename(FileName) ->
+    FilePathList = filename:split(FileName),
+    PureFN = lists:last(FilePathList),
+    filename:basename(PureFN, ".view").
+
 get_view(Db, DDoc, ViewName, Args0) ->
     case get_view_index_state(Db, DDoc, ViewName, Args0) of
         {ok, State, Args2} ->
@@ -196,7 +231,7 @@ extract_view(Lang, #mrargs{view_type=red}=Args, Name, [View | Rest]) ->
 view_sig(Db, State, View, #mrargs{include_docs=true}=Args) ->
     BaseSig = view_sig(Db, State, View, Args#mrargs{include_docs=false}),
     UpdateSeq = couch_db:get_update_seq(Db),
-    {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+    PurgeSeq = couch_db:get_purge_seq(Db),
     #mrst{
         seq_indexed=SeqIndexed,
         keyseq_indexed=KeySeqIndexed
@@ -230,7 +265,7 @@ view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args) ->
 
 
 init_state(Db, Fd, #mrst{views=Views}=State, nil) ->
-    {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+    PurgeSeq = couch_db:get_purge_seq(Db),
     Header = #mrheader{
         seq=0,
         purge_seq=PurgeSeq,
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
new file mode 100644
index 0000000..213acac
--- /dev/null
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
@@ -0,0 +1,276 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_purge_docs_fabric_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+    DbName = ?tempdb(),
+    ok = fabric:create_db(DbName, [?ADMIN_CTX, {q, 1}]),
+    meck:new(couch_mrview_index, [passthrough]),
+    meck:expect(couch_mrview_index, ensure_local_purge_docs, fun(A, B) ->
+        meck:passthrough([A, B])
+    end),
+    DbName.
+
+
+teardown(DbName) ->
+    meck:unload(),
+    ok = fabric:delete_db(DbName, [?ADMIN_CTX]).
+
+
+view_purge_fabric_test_() ->
+    {
+        "Map views",
+        {
+            setup,
+            fun() -> test_util:start_couch([fabric, mem3]) end,
+            fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0,
+                fun teardown/1,
+                [
+                    fun test_purge_verify_index/1,
+                    fun test_purge_hook_before_compaction/1
+                ]
+            }
+        }
+    }.
+
+
+test_purge_verify_index(DbName) ->
+    ?_test(begin
+        Docs1 = couch_mrview_test_util:make_docs(normal, 5),
+        {ok, _} = fabric:update_docs(DbName, Docs1, [?ADMIN_CTX]),
+        {ok, _} = fabric:update_doc(
+            DbName,
+            couch_mrview_test_util:ddoc(map),
+            [?ADMIN_CTX]
+        ),
+
+        Result1 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+        Expect1 = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect1, Result1),
+
+        {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName),
+        ?assertEqual(0, couch_util:get_value(<<"purge_seq">>, Props1)),
+        ShardNames = [Sh || #shard{name = Sh} <- mem3:local_shards(DbName)],
+        [ShardDbName | _Rest ] = ShardNames,
+        ?assertEqual(true, couch_mrview_index:verify_index_exists(
+            ShardDbName, Props1)),
+
+        purge_docs(DbName, [<<"1">>]),
+
+        Result2 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+        Expect2 = {ok, [
+            {meta, [{total, 4}, {offset, 0}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect2, Result2),
+
+        {ok, #doc{body = {Props2}}} = get_local_purge_doc(DbName),
+        ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props2)),
+        ?assertEqual(true, couch_mrview_index:verify_index_exists(
+            ShardDbName, Props2))
+    end).
+
+
+test_purge_hook_before_compaction(DbName) ->
+    ?_test(begin
+        Docs1 = couch_mrview_test_util:make_docs(normal, 5),
+        {ok, _} = fabric:update_docs(DbName, Docs1, [?ADMIN_CTX]),
+        {ok, _} = fabric:update_doc(
+            DbName,
+            couch_mrview_test_util:ddoc(map),
+            [?ADMIN_CTX]
+        ),
+
+        Result1 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+        Expect1 = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect1, Result1),
+
+        purge_docs(DbName, [<<"1">>]),
+
+        Result2 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+        Expect2 = {ok, [
+            {meta, [{total, 4}, {offset, 0}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect2, Result2),
+
+        {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName),
+        ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props1)),
+
+        [ShardName | _] = local_shards(DbName),
+        couch_util:with_db(ShardName, fun(Db) ->
+            {ok, _} = couch_db:start_compact(Db)
+        end),
+        wait_compaction(ShardName, ?LINE),
+
+        ?assertEqual(ok, meck:wait(1, couch_mrview_index,
+            ensure_local_purge_docs, '_', 5000)
+        ),
+
+        % Make sure compaction didn't change the update seq
+        {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName),
+        ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props1)),
+
+        purge_docs(DbName, [<<"2">>]),
+
+        couch_util:with_db(ShardName, fun(Db) ->
+            {ok, _} = couch_db:start_compact(Db)
+        end),
+        wait_compaction(ShardName, ?LINE),
+
+        ?assertEqual(ok, meck:wait(2, couch_mrview_index,
+            ensure_local_purge_docs, '_', 5000)
+        ),
+
+        % Make sure compaction after a purge didn't overwrite
+        % the local purge doc for the index
+        {ok, #doc{body = {Props2}}} = get_local_purge_doc(DbName),
+        ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props2)),
+
+        % Force another update to ensure that we update
+        % the local doc appropriate after compaction
+        Result3 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+        Expect3 = {ok, [
+            {meta, [{total, 3}, {offset, 0}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect3, Result3),
+
+        {ok, #doc{body = {Props3}}} = get_local_purge_doc(DbName),
+        ?assertEqual(2, couch_util:get_value(<<"purge_seq">>, Props3)),
+
+        % Check that if the local doc doesn't exist that one
+        % is created for the index on compaction
+        delete_local_purge_doc(DbName),
+        ?assertMatch({not_found, _}, get_local_purge_doc(DbName)),
+
+        couch_util:with_db(ShardName, fun(Db) ->
+            {ok, _} = couch_db:start_compact(Db)
+        end),
+        wait_compaction(ShardName, ?LINE),
+
+        ?assertEqual(ok, meck:wait(3, couch_mrview_index,
+            ensure_local_purge_docs, '_', 5000)
+        ),
+
+        {ok, #doc{body = {Props4}}} = get_local_purge_doc(DbName),
+        ?assertEqual(2, couch_util:get_value(<<"purge_seq">>, Props4))
+    end).
+
+
+get_local_purge_doc(DbName) ->
+    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/bar">>, []),
+    {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc),
+    Sig = IdxState#mrst.sig,
+    HexSig = list_to_binary(couch_index_util:hexsig(Sig)),
+    DocId = couch_mrview_util:get_local_purge_doc_id(HexSig),
+    [ShardName | _] = local_shards(DbName),
+    couch_util:with_db(ShardName, fun(Db) ->
+        couch_db:open_doc(Db, DocId, [])
+    end).
+
+
+delete_local_purge_doc(DbName) ->
+    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/bar">>, []),
+    {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc),
+    Sig = IdxState#mrst.sig,
+    HexSig = list_to_binary(couch_index_util:hexsig(Sig)),
+    DocId = couch_mrview_util:get_local_purge_doc_id(HexSig),
+    NewDoc = #doc{id = DocId, deleted = true},
+    [ShardName | _] = local_shards(DbName),
+    couch_util:with_db(ShardName, fun(Db) ->
+        {ok, _} = couch_db:update_doc(Db, NewDoc, [])
+    end).
+
+
+get_rev(#full_doc_info{} = FDI) ->
+    #doc_info{
+        revs = [#rev_info{} = PrevRev | _]
+    } = couch_doc:to_doc_info(FDI),
+    PrevRev#rev_info.rev.
+
+
+purge_docs(DbName, DocIds) ->
+    lists:foreach(fun(DocId) ->
+        FDI = fabric:get_full_doc_info(DbName, DocId, []),
+        Rev = get_rev(FDI),
+        {ok, [{ok, _}]} = fabric:purge_docs(DbName, [{DocId, [Rev]}], [])
+    end, DocIds).
+
+
+wait_compaction(DbName, Line) ->
+    WaitFun = fun() ->
+        case is_compaction_running(DbName) of
+            true -> wait;
+            false -> ok
+        end
+    end,
+    case test_util:wait(WaitFun, 10000) of
+        timeout ->
+            erlang:error({assertion_failed, [
+                    {module, ?MODULE},
+                    {line, Line},
+                    {reason, "Timeout waiting for database compaction"}
+                ]});
+        _ ->
+            ok
+    end.
+
+
+is_compaction_running(DbName) ->
+    {ok, DbInfo} = couch_util:with_db(DbName, fun(Db) ->
+        couch_db:get_db_info(Db)
+    end),
+    couch_util:get_value(compact_running, DbInfo).
+
+
+local_shards(DbName) ->
+    try
+        [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+    catch
+        error:database_does_not_exist ->
+            []
+    end.
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
new file mode 100644
index 0000000..eb180b0
--- /dev/null
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
@@ -0,0 +1,506 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_purge_docs_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+    meck:new(couch_index_updater, [passthrough]),
+    {ok, Db} = couch_mrview_test_util:init_db(?tempdb(), map, 5),
+    Db.
+
+teardown(Db) ->
+    couch_db:close(Db),
+    couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
+    meck:unload(),
+    ok.
+
+view_purge_test_() ->
+    {
+        "Map views",
+        {
+            setup,
+            fun test_util:start_couch/0,
+            fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0,
+                fun teardown/1,
+                [
+                    fun test_purge_single/1,
+                    fun test_purge_partial/1,
+                    fun test_purge_complete/1,
+                    fun test_purge_nochange/1,
+                    fun test_purge_index_reset/1,
+                    fun test_purge_compact_size_check/1,
+                    fun test_purge_compact_for_stale_purge_cp_without_client/1,
+                    fun test_purge_compact_for_stale_purge_cp_with_client/1
+                ]
+            }
+        }
+    }.
+
+
+test_purge_single(Db) ->
+    ?_test(begin
+        Result = run_query(Db, []),
+        Expect = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect, Result),
+
+        FDI = couch_db:get_full_doc_info(Db, <<"1">>),
+        Rev = get_rev(FDI),
+        {ok, [{ok, _PRevs}]} = couch_db:purge_docs(
+                Db,
+                [{<<"UUID1">>, <<"1">>, [Rev]}]
+            ),
+        {ok, Db2} = couch_db:reopen(Db),
+
+        Result2 = run_query(Db2, []),
+        Expect2 = {ok, [
+            {meta, [{total, 4}, {offset, 0}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect2, Result2)
+    end).
+
+
+test_purge_partial(Db) ->
+    ?_test(begin
+        Result = run_query(Db, []),
+        Expect = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect, Result),
+
+        FDI1 = couch_db:get_full_doc_info(Db, <<"1">>), Rev1 = get_rev(FDI1),
+        Update = {[
+            {'_id', <<"1">>},
+            {'_rev', couch_doc:rev_to_str({1, [crypto:hash(md5, <<"1.2">>)]})},
+            {'val', 1.2}
+        ]},
+        {ok, [_Rev2]} = save_docs(Db, [Update], [replicated_changes]),
+
+        PurgeInfos = [{<<"UUID1">>, <<"1">>, [Rev1]}],
+
+        {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+        {ok, Db2} = couch_db:reopen(Db),
+
+        Result2 = run_query(Db2, []),
+        Expect2 = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1.2}, {value, 1.2}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect2, Result2)
+    end).
+
+
+test_purge_complete(Db) ->
+    ?_test(begin
+        Result = run_query(Db, []),
+        Expect = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect, Result),
+
+        FDI1 = couch_db:get_full_doc_info(Db, <<"1">>), Rev1 = get_rev(FDI1),
+        FDI2 = couch_db:get_full_doc_info(Db, <<"2">>), Rev2 = get_rev(FDI2),
+        FDI5 = couch_db:get_full_doc_info(Db, <<"5">>), Rev5 = get_rev(FDI5),
+
+        PurgeInfos = [
+            {<<"UUID1">>, <<"1">>, [Rev1]},
+            {<<"UUID2">>, <<"2">>, [Rev2]},
+            {<<"UUID5">>, <<"5">>, [Rev5]}
+        ],
+        {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+        {ok, Db2} = couch_db:reopen(Db),
+
+        Result2 = run_query(Db2, []),
+        Expect2 = {ok, [
+            {meta, [{total, 2}, {offset, 0}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}
+        ]},
+        ?assertEqual(Expect2, Result2)
+    end).
+
+
+test_purge_nochange(Db) ->
+    ?_test(begin
+        Result = run_query(Db, []),
+        Expect = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect, Result),
+
+        FDI1 = couch_db:get_full_doc_info(Db, <<"1">>),
+        Rev1 = get_rev(FDI1),
+
+        PurgeInfos = [
+            {<<"UUID1">>, <<"6">>, [Rev1]}
+        ],
+        {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+        {ok, Db2} = couch_db:reopen(Db),
+
+        Result2 = run_query(Db2, []),
+        Expect2 = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect2, Result2)
+    end).
+
+
+test_purge_index_reset(Db) ->
+    ?_test(begin
+        ok = couch_db:set_purge_infos_limit(Db, 2),
+        {ok, Db1} = couch_db:reopen(Db),
+
+        Result = run_query(Db1, []),
+        Expect = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect, Result),
+
+        PurgeInfos = lists:map(fun(I) ->
+            DocId = list_to_binary(integer_to_list(I)),
+            FDI = couch_db:get_full_doc_info(Db, DocId),
+            Rev = get_rev(FDI),
+            {couch_uuids:random(), DocId, [Rev]}
+        end, lists:seq(1, 5)),
+        {ok, _} = couch_db:purge_docs(Db1, PurgeInfos),
+
+        {ok, Db2} = couch_db:reopen(Db1),
+
+        % Forcibly set the purge doc to a newer purge
+        % sequence to force an index reset. This should
+        % never happen in real life but the reset
+        % is required for correctness.
+        {ok, #doc{body = {OldProps}} = LocalDoc} = get_local_purge_doc(Db2),
+        NewPurgeSeq = {<<"purge_seq">>, 5},
+        NewProps = lists:keyreplace(<<"purge_seq">>, 1, OldProps, NewPurgeSeq),
+        RewindDoc = LocalDoc#doc{body = {NewProps}},
+        {ok, _} = couch_db:update_doc(Db2, RewindDoc, []),
+
+        % Compact the database to remove purge infos
+        {ok, _} = couch_db:start_compact(Db2),
+        wait_compaction(couch_db:name(Db), "database", ?LINE),
+
+        {ok, Db3} = couch_db:reopen(Db2),
+        Result2 = run_query(Db3, []),
+        Expect2 = {ok, [
+            {meta, [{total, 0}, {offset, 0}]}
+        ]},
+        ?assertEqual(Expect2, Result2),
+
+        % Assert that we had a reset
+        meck:wait(
+                1,
+                couch_index_updater,
+                handle_info,
+                [{'EXIT', '_', {reset, '_'}}, '_'],
+                5000
+            )
+    end).
+
+
+test_purge_compact_size_check(Db) ->
+    ?_test(begin
+        DbName = couch_db:name(Db),
+        Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+        {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+        _Result = run_query(Db1, []),
+        DiskSizeBefore = db_disk_size(DbName),
+
+        PurgedDocsNum = 150,
+        IdsRevs = lists:foldl(fun(Id, CIdRevs) ->
+            Id1 = docid(Id),
+            FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+            Rev1 = get_rev(FDI1),
+            UUID1 = uuid(Id),
+            [{UUID1, Id1, [Rev1]} | CIdRevs]
+        end, [], lists:seq(1, PurgedDocsNum)),
+        {ok, _} = couch_db:purge_docs(Db1, IdsRevs),
+
+        {ok, Db2} = couch_db:reopen(Db1),
+        _Result1 = run_query(Db2, []),
+        {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+                Db2,
+                0,
+                fun fold_fun/2,
+                [],
+                []
+            ),
+        ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+        config:set("couchdb", "file_compression", "snappy", false),
+
+        {ok, Db3} = couch_db:open_int(DbName, []),
+        {ok, _CompactPid} = couch_db:start_compact(Db3),
+        wait_compaction(DbName, "database", ?LINE),
+        ok = couch_db:close(Db3),
+        DiskSizeAfter = db_disk_size(DbName),
+        ?assert(DiskSizeBefore > DiskSizeAfter)
+    end).
+
+
+test_purge_compact_for_stale_purge_cp_without_client(Db) ->
+    ?_test(begin
+        DbName = couch_db:name(Db),
+        % add more documents to database for purge
+        Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+        {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+
+        % change PurgedDocsLimit to 10 from 1000 to
+        % avoid timeout of eunit test
+        PurgedDocsLimit = 10,
+        couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit),
+
+        % purge 150 documents
+        PurgedDocsNum = 150,
+        PurgeInfos = lists:foldl(fun(Id, CIdRevs) ->
+            Id1 = docid(Id),
+            FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+            Rev1 = get_rev(FDI1),
+            UUID1 = uuid(Id),
+            [{UUID1, Id1, [Rev1]} | CIdRevs]
+        end, [], lists:seq(1, PurgedDocsNum)),
+        {ok, _} = couch_db:purge_docs(Db1, PurgeInfos),
+
+        {ok, Db2} = couch_db:reopen(Db1),
+        {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+                Db2,
+                0,
+                fun fold_fun/2,
+                [],
+                []
+            ),
+        ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+
+        % run compaction to trigger pruning of purge tree
+        {ok, Db3} = couch_db:open_int(DbName, []),
+        {ok, _CompactPid} = couch_db:start_compact(Db3),
+        wait_compaction(DbName, "database", ?LINE),
+        ok = couch_db:close(Db3),
+
+        % check the remaining purge requests in purge tree
+        {ok, Db4} = couch_db:reopen(Db3),
+        OldestPSeq = couch_db:get_oldest_purge_seq(Db4),
+        {ok, PurgedIdRevs2} = couch_db:fold_purge_infos(
+                Db4,
+                OldestPSeq - 1,
+                fun fold_fun/2,
+                [],
+                []
+            ),
+        ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2))
+    end).
+
+
+test_purge_compact_for_stale_purge_cp_with_client(Db) ->
+    ?_test(begin
+        DbName = couch_db:name(Db),
+        % add more documents to database for purge
+        Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+        {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+
+        % change PurgedDocsLimit to 10 from 1000 to
+        % avoid timeout of eunit test
+        PurgedDocsLimit = 10,
+        couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit),
+        _Result = run_query(Db1, []),
+
+        % first purge 30 documents
+        PurgedDocsNum1 = 30,
+        IdsRevs = lists:foldl(fun(Id, CIdRevs) ->
+            Id1 = docid(Id),
+            FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+            Rev1 = get_rev(FDI1),
+            UUID1 = uuid(Id),
+            [{UUID1, Id1, [Rev1]} | CIdRevs]
+        end, [], lists:seq(1, PurgedDocsNum1)),
+        {ok, _} = couch_db:purge_docs(Db1, IdsRevs),
+
+        {ok, Db2} = couch_db:reopen(Db1),
+        % run query again to reflect purge request to mrview
+        _Result1 = run_query(Db2, []),
+        {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+                Db2,
+                0,
+                fun fold_fun/2,
+                [],
+                []
+            ),
+        ?assertEqual(PurgedDocsNum1, length(PurgedIdRevs)),
+
+        % then purge 120 documents
+        PurgedDocsNum2 = 150,
+        IdsRevs2 = lists:foldl(fun(Id, CIdRevs) ->
+            Id1 = docid(Id),
+            FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+            Rev1 = get_rev(FDI1),
+            UUID1 = uuid(Id),
+            [{UUID1, Id1, [Rev1]} | CIdRevs]
+        end, [], lists:seq(PurgedDocsNum1 + 1, PurgedDocsNum2)),
+        {ok, _} = couch_db:purge_docs(Db2, IdsRevs2),
+
+        % run compaction to trigger pruning of purge tree
+        % only the first 30 purge requests are pruned
+        {ok, Db3} = couch_db:open_int(DbName, []),
+        {ok, _CompactPid} = couch_db:start_compact(Db3),
+        wait_compaction(DbName, "database", ?LINE),
+        ok = couch_db:close(Db3),
+
+        % check the remaining purge requests in purge tree
+        {ok, Db4} = couch_db:reopen(Db3),
+        OldestPSeq = couch_db:get_oldest_purge_seq(Db4),
+        {ok, PurgedIdRevs2} = couch_db:fold_purge_infos(
+                Db4,
+                OldestPSeq - 1,
+                fun fold_fun/2,
+                [],
+                []
+            ),
+        ?assertEqual(PurgedDocsNum2 - PurgedDocsNum1, length(PurgedIdRevs2))
+    end).
+
+
+get_local_purge_doc(Db) ->
+    {ok, DDoc} = couch_db:open_doc(Db, <<"_design/bar">>, []),
+    {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc),
+    Sig = IdxState#mrst.sig,
+    HexSig = list_to_binary(couch_index_util:hexsig(Sig)),
+    DocId = couch_mrview_util:get_local_purge_doc_id(HexSig),
+    couch_db:open_doc(Db, DocId, []).
+
+
+run_query(Db, Opts) ->
+    couch_mrview:query_view(Db, <<"_design/bar">>, <<"baz">>, Opts).
+
+
+save_docs(Db, JsonDocs, Options) ->
+    Docs = lists:map(fun(JDoc) ->
+        couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE(JDoc)))
+                     end, JsonDocs),
+    Opts = [full_commit | Options],
+    case lists:member(replicated_changes, Options) of
+        true ->
+            {ok, []} = couch_db:update_docs(
+                Db, Docs, Opts, replicated_changes),
+            {ok, lists:map(fun(Doc) ->
+                {Pos, [RevId | _]} = Doc#doc.revs,
+                {Pos, RevId}
+                           end, Docs)};
+        false ->
+            {ok, Resp} = couch_db:update_docs(Db, Docs, Opts),
+            {ok, [Rev || {ok, Rev} <- Resp]}
+    end.
+
+
+get_rev(#full_doc_info{} = FDI) ->
+    #doc_info{
+        revs = [#rev_info{} = PrevRev | _]
+    } = couch_doc:to_doc_info(FDI),
+    PrevRev#rev_info.rev.
+
+
+db_disk_size(DbName) ->
+    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, Info} = couch_db:get_db_info(Db),
+    ok = couch_db:close(Db),
+    active_size(Info).
+
+
+active_size(Info) ->
+    couch_util:get_nested_json_value({Info}, [sizes, active]).
+
+
+wait_compaction(DbName, Kind, Line) ->
+    WaitFun = fun() ->
+        case is_compaction_running(DbName) of
+            true -> wait;
+            false -> ok
+        end
+    end,
+    case test_util:wait(WaitFun, 10000) of
+        timeout ->
+            erlang:error({assertion_failed,
+                [{module, ?MODULE},
+                    {line, Line},
+                    {reason, "Timeout waiting for "
+                        ++ Kind
+                        ++ " database compaction"}]});
+        _ ->
+            ok
+    end.
+
+
+is_compaction_running(DbName) ->
+    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, DbInfo} = couch_db:get_db_info(Db),
+    couch_db:close(Db),
+    couch_util:get_value(compact_running, DbInfo).
+
+
+fold_fun({_PSeq, _UUID, Id, Revs}, Acc) ->
+    {ok, [{Id, Revs} | Acc]}.
+
+
+docid(I) ->
+    list_to_binary(integer_to_list(I)).
+
+
+uuid(I) ->
+    Str = io_lib:format("UUID~4..0b", [I]),
+    iolist_to_binary(Str).