You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/06/01 17:11:58 UTC

[couchdb] 06/10: [06/10] Clustered Purge: Update mrview indexes

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit c67815e8272ef2e90aa6a2a07522a9590bbbc49a
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Apr 24 12:26:42 2018 -0500

    [06/10] Clustered Purge: Update mrview indexes
    
    This commit updates the mrview secondary index to properly process the
    new history of purge requests as well as to store the _local purge
    checkpoint doc.
    
    The importance of the _local checkpoint doc is to ensure that compaction
    of a database does not remove any purge requests that have not yet been
    processed by this secondary index.
    
    COUCHDB-3326
    
    Co-authored-by: Mayya Sharipova <ma...@ca.ibm.com>
    Co-authored-by: jiangphcn <ji...@cn.ibm.com>
---
 src/couch/test/couch_bt_engine_upgrade_tests.erl   |  51 ++-
 .../fixtures/db_with_1_purge_req_for_2_docs.couch  | Bin 0 -> 16557 bytes
 src/couch_index/src/couch_index_epi.erl            |   5 +-
 ...dex_epi.erl => couch_index_plugin_couch_db.erl} |  38 +-
 src/couch_index/src/couch_index_updater.erl        |  48 +-
 src/couch_mrview/src/couch_mrview_cleanup.erl      |  18 +-
 src/couch_mrview/src/couch_mrview_index.erl        | 117 ++++-
 src/couch_mrview/src/couch_mrview_test_util.erl    |   5 +
 src/couch_mrview/src/couch_mrview_updater.erl      |  14 +-
 src/couch_mrview/src/couch_mrview_util.erl         |  23 +
 .../test/couch_mrview_purge_docs_fabric_tests.erl  | 272 +++++++++++
 .../test/couch_mrview_purge_docs_tests.erl         | 495 +++++++++++++++++++++
 12 files changed, 1023 insertions(+), 63 deletions(-)

diff --git a/src/couch/test/couch_bt_engine_upgrade_tests.erl b/src/couch/test/couch_bt_engine_upgrade_tests.erl
index b4120ec..6aef366 100644
--- a/src/couch/test/couch_bt_engine_upgrade_tests.erl
+++ b/src/couch/test/couch_bt_engine_upgrade_tests.erl
@@ -22,7 +22,8 @@ setup() ->
     DbFileNames = [
         "db_without_purge_req.couch",
         "db_with_1_purge_req.couch",
-        "db_with_2_purge_req.couch"
+        "db_with_2_purge_req.couch",
+        "db_with_1_purge_req_for_2_docs.couch"
     ],
     NewPaths = lists:map(fun(DbFileName) ->
         OldDbFilePath = filename:join([?FIXTURESDIR, DbFileName]),
@@ -52,7 +53,8 @@ upgrade_test_() ->
             [
                 t_upgrade_without_purge_req(),
                 t_upgrade_with_1_purge_req(),
-                t_upgrade_with_N_purge_req()
+                t_upgrade_with_N_purge_req(),
+                t_upgrade_with_1_purge_req_for_2_docs()
             ]
         }
     }.
@@ -70,7 +72,9 @@ t_upgrade_without_purge_req() ->
         end),
         ?assertEqual([], UpgradedPurged),
 
-        {ok, Rev} = save_doc(DbName, {[{<<"_id">>, <<"doc4">>}, {<<"v">>, 1}]}),
+        {ok, Rev} = save_doc(
+            DbName, {[{<<"_id">>, <<"doc4">>}, {<<"v">>, 1}]}
+        ),
         {ok, _} = save_doc(DbName, {[{<<"_id">>, <<"doc5">>}, {<<"v">>, 2}]}),
 
         couch_util:with_db(DbName, fun(Db) ->
@@ -106,7 +110,9 @@ t_upgrade_with_1_purge_req() ->
         end),
         ?assertEqual([{1, <<"doc1">>}], UpgradedPurged),
 
-        {ok, Rev} = save_doc(DbName, {[{<<"_id">>, <<"doc4">>}, {<<"v">>, 1}]}),
+        {ok, Rev} = save_doc(
+            DbName, {[{<<"_id">>, <<"doc4">>}, {<<"v">>, 1}]}
+        ),
         {ok, _} = save_doc(DbName, {[{<<"_id">>, <<"doc5">>}, {<<"v">>, 2}]}),
 
         couch_util:with_db(DbName, fun(Db) ->
@@ -166,6 +172,43 @@ t_upgrade_with_N_purge_req() ->
     end).
 
 
+t_upgrade_with_1_purge_req_for_2_docs() ->
+    ?_test(begin
+        % There are two documents (Doc4 and Doc5) in the fixture database
+        % with three docs (Doc1, Doc2 and Doc3) that have been purged, and
+        % with one purge req for Doc1 and another purge req for Doc 2 and Doc3
+        DbName = <<"db_with_1_purge_req_for_2_docs">>,
+
+        {ok, UpgradedPurged} = couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual({ok, 3}, couch_db:get_purge_seq(Db)),
+            couch_db:fold_purge_infos(Db, 1, fun fold_fun/2, [])
+        end),
+        ?assertEqual([{3,<<"doc2">>},{2,<<"doc3">>}], UpgradedPurged),
+
+        {ok, Rev} = save_doc(DbName, {[{<<"_id">>, <<"doc6">>}, {<<"v">>, 1}]}),
+        {ok, _} = save_doc(DbName, {[{<<"_id">>, <<"doc7">>}, {<<"v">>, 2}]}),
+
+        couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual({ok, 4}, couch_db:get_doc_count(Db)),
+            ?assertEqual({ok, 3}, couch_db:get_purge_seq(Db))
+        end),
+
+        PurgeReqs = [
+            {couch_uuids:random(), <<"doc6">>, [Rev]}
+        ],
+
+        {ok, [{ok, PRevs}]} = couch_util:with_db(DbName, fun(Db) ->
+            couch_db:purge_docs(Db, PurgeReqs)
+        end),
+        ?assertEqual(PRevs, [Rev]),
+
+        couch_util:with_db(DbName, fun(Db) ->
+            ?assertEqual({ok, 3}, couch_db:get_doc_count(Db)),
+            ?assertEqual({ok, 4}, couch_db:get_purge_seq(Db))
+        end)
+    end).
+
+
 save_doc(DbName, Json) ->
     Doc = couch_doc:from_json_obj(Json),
     couch_util:with_db(DbName, fun(Db) ->
diff --git a/src/couch/test/fixtures/db_with_1_purge_req_for_2_docs.couch b/src/couch/test/fixtures/db_with_1_purge_req_for_2_docs.couch
new file mode 100644
index 0000000..b584fce
Binary files /dev/null and b/src/couch/test/fixtures/db_with_1_purge_req_for_2_docs.couch differ
diff --git a/src/couch_index/src/couch_index_epi.erl b/src/couch_index/src/couch_index_epi.erl
index 946a590..1c4eb95 100644
--- a/src/couch_index/src/couch_index_epi.erl
+++ b/src/couch_index/src/couch_index_epi.erl
@@ -28,8 +28,9 @@ app() ->
     couch_index.
 
 providers() ->
-    [].
-
+    [
+        {couch_db, couch_index_plugin_couch_db}
+    ].
 
 services() ->
     [
diff --git a/src/couch_index/src/couch_index_epi.erl b/src/couch_index/src/couch_index_plugin_couch_db.erl
similarity index 52%
copy from src/couch_index/src/couch_index_epi.erl
copy to src/couch_index/src/couch_index_plugin_couch_db.erl
index 946a590..937f7c8 100644
--- a/src/couch_index/src/couch_index_epi.erl
+++ b/src/couch_index/src/couch_index_plugin_couch_db.erl
@@ -2,7 +2,7 @@
 % use this file except in compliance with the License. You may obtain a copy of
 % the License at
 %
-% http://www.apache.org/licenses/LICENSE-2.0
+%   http://www.apache.org/licenses/LICENSE-2.0
 %
 % Unless required by applicable law or agreed to in writing, software
 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
@@ -10,40 +10,12 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
--module(couch_index_epi).
-
--behaviour(couch_epi_plugin).
+-module(couch_index_plugin_couch_db).
 
 -export([
-    app/0,
-    providers/0,
-    services/0,
-    data_subscriptions/0,
-    data_providers/0,
-    processes/0,
-    notify/3
+    on_compact/2
 ]).
 
-app() ->
-    couch_index.
-
-providers() ->
-    [].
-
-
-services() ->
-    [
-        {couch_index, couch_index_plugin}
-    ].
-
-data_subscriptions() ->
-    [].
-
-data_providers() ->
-    [].
-
-processes() ->
-    [].
 
-notify(_Key, _Old, _New) ->
-    ok.
+on_compact(DbName, DDocs) ->
+    couch_mrview_index:ensure_local_purge_docs(DbName, DDocs).
diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl
index 5ab9ea8..73361a9 100644
--- a/src/couch_index/src/couch_index_updater.erl
+++ b/src/couch_index/src/couch_index_updater.erl
@@ -141,12 +141,10 @@ update(Idx, Mod, IdxState) ->
         DbUpdateSeq = couch_db:get_update_seq(Db),
         DbCommittedSeq = couch_db:get_committed_update_seq(Db),
 
-        PurgedIdxState = case purge_index(Db, Mod, IdxState) of
-            {ok, IdxState0} -> IdxState0;
-            reset -> exit({reset, self()})
-        end,
-
-        NumChanges = couch_db:count_changes_since(Db, CurrSeq),
+        NumUpdateChanges = couch_db:count_changes_since(Db, CurrSeq),
+        NumPurgeChanges = count_pending_purged_docs_since(Db, Mod, IdxState),
+        TotalChanges = NumUpdateChanges + NumPurgeChanges,
+        {ok, PurgedIdxState} = purge_index(Db, Mod, IdxState),
 
         GetSeq = fun
             (#full_doc_info{update_seq=Seq}) -> Seq;
@@ -185,8 +183,13 @@ update(Idx, Mod, IdxState) ->
                     {ok, {NewSt, true}}
             end
         end,
+        {ok, InitIdxState} = Mod:start_update(
+                Idx,
+                PurgedIdxState,
+                TotalChanges,
+                NumPurgeChanges
+            ),
 
-        {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges),
         Acc0 = {InitIdxState, true},
         {ok, Acc} = couch_db:fold_changes(Db, CurrSeq, Proc, Acc0, []),
         {ProcIdxSt, SendLast} = Acc,
@@ -208,12 +211,27 @@ update(Idx, Mod, IdxState) ->
 purge_index(Db, Mod, IdxState) ->
     {ok, DbPurgeSeq} = couch_db:get_purge_seq(Db),
     IdxPurgeSeq = Mod:get(purge_seq, IdxState),
-    if
-        DbPurgeSeq == IdxPurgeSeq ->
-            {ok, IdxState};
-        DbPurgeSeq == IdxPurgeSeq + 1 ->
-            {ok, PurgedIdRevs} = couch_db:get_last_purged(Db),
-            Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState);
-        true ->
-            reset
+    if IdxPurgeSeq == DbPurgeSeq -> {ok, IdxState}; true ->
+        FoldFun = fun({PurgeSeq, _UUId, Id, Revs}, Acc) ->
+            Mod:purge(Db, PurgeSeq, [{Id, Revs}], Acc)
+        end,
+        {ok, NewStateAcc} = try
+            couch_db:fold_purge_infos(
+                    Db,
+                    IdxPurgeSeq,
+                    FoldFun,
+                    IdxState,
+                    []
+                )
+        catch error:{invalid_start_purge_seq, _} ->
+            exit({reset, self()})
+        end,
+        Mod:update_local_purge_doc(Db, NewStateAcc),
+        {ok, NewStateAcc}
     end.
+
+
+count_pending_purged_docs_since(Db, Mod, IdxState) ->
+    {ok, DbPurgeSeq} = couch_db:get_purge_seq(Db),
+    IdxPurgeSeq = Mod:get(purge_seq, IdxState),
+    DbPurgeSeq - IdxPurgeSeq.
diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl b/src/couch_mrview/src/couch_mrview_cleanup.erl
index 380376d..93c9387 100644
--- a/src/couch_mrview/src/couch_mrview_cleanup.erl
+++ b/src/couch_mrview/src/couch_mrview_cleanup.erl
@@ -41,7 +41,23 @@ run(Db) ->
 
     lists:foreach(fun(FN) ->
         couch_log:debug("Deleting stale view file: ~s", [FN]),
-        couch_file:delete(RootDir, FN, [sync])
+        couch_file:delete(RootDir, FN, [sync]),
+        Sig = couch_mrview_util:get_signature_from_filename(FN),
+        if length(Sig) < 16 -> ok; true ->
+            case re:run(Sig,"^[a-fA-F0-9]+$",[{capture, none}]) of
+                match ->
+                    DocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+                    case couch_db:open_doc(Db, DocId, []) of
+                        {ok, LocalPurgeDoc} ->
+                            couch_db:update_doc(Db,
+                                LocalPurgeDoc#doc{deleted=true}, [?ADMIN_CTX]);
+                        {not_found, _} ->
+                            ok
+                    end;
+                _ ->
+                    ok
+            end
+        end
     end, ToDelete),
 
     ok.
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index aa1ee27..7756f52 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -15,9 +15,11 @@
 
 -export([get/2]).
 -export([init/2, open/2, close/1, reset/1, delete/1, shutdown/1]).
--export([start_update/3, purge/4, process_doc/3, finish_update/1, commit/1]).
+-export([start_update/4, purge/4, process_doc/3, finish_update/1, commit/1]).
 -export([compact/3, swap_compacted/2, remove_compacted/1]).
 -export([index_file_exists/1]).
+-export([update_local_purge_doc/2, verify_index_exists/1]).
+-export([ensure_local_purge_docs/2]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -121,14 +123,17 @@ open(Db, State) ->
                 {ok, {OldSig, Header}} ->
                     % Matching view signatures.
                     NewSt = couch_mrview_util:init_state(Db, Fd, State, Header),
+                    ensure_local_purge_doc(Db, NewSt),
                     {ok, NewSt};
                 % end of upgrade code for <= 1.2.x
                 {ok, {Sig, Header}} ->
                     % Matching view signatures.
                     NewSt = couch_mrview_util:init_state(Db, Fd, State, Header),
+                    ensure_local_purge_doc(Db, NewSt),
                     {ok, NewSt};
                 _ ->
                     NewSt = couch_mrview_util:reset_index(Db, Fd, State),
+                    ensure_local_purge_doc(Db, NewSt),
                     {ok, NewSt}
             end;
         {error, Reason} = Error ->
@@ -167,8 +172,13 @@ reset(State) ->
     end).
 
 
-start_update(PartialDest, State, NumChanges) ->
-    couch_mrview_updater:start_update(PartialDest, State, NumChanges).
+start_update(PartialDest, State, NumChanges, NumChangesDone) ->
+    couch_mrview_updater:start_update(
+        PartialDest,
+        State,
+        NumChanges,
+        NumChangesDone
+    ).
 
 
 purge(Db, PurgeSeq, PurgedIdRevs, State) ->
@@ -207,3 +217,104 @@ index_file_exists(State) ->
     } = State,
     IndexFName = couch_mrview_util:index_file(DbName, Sig),
     filelib:is_file(IndexFName).
+
+
+verify_index_exists(Props) ->
+    ShardDbName = couch_mrview_util:get_value_from_options(
+        <<"dbname">>,
+        Props
+    ),
+    DDocId = couch_mrview_util:get_value_from_options(
+        <<"ddoc_id">>,
+        Props
+    ),
+    SigInLocal = couch_mrview_util:get_value_from_options(
+        <<"signature">>,
+        Props
+    ),
+    case couch_db:open_int(ShardDbName, []) of
+        {ok, Db} ->
+            try
+                DbName = mem3:dbname(couch_db:name(Db)),
+                case ddoc_cache:open(DbName, DDocId) of
+                    {ok, DDoc} ->
+                        {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(
+                            ShardDbName,
+                            DDoc
+                        ),
+                        IdxSig = IdxState#mrst.sig,
+                        couch_index_util:hexsig(IdxSig) == SigInLocal;
+                    _Else ->
+                        false
+                end
+            catch E:T ->
+                Stack = erlang:get_stacktrace(),
+                couch_log:error(
+                    "Error occurs when verifying existence of ~s/~s :: ~p ~p",
+                    [ShardDbName, DDocId, {E, T}, Stack]
+                ),
+                false
+            after
+                catch couch_db:close(Db)
+            end;
+        _ ->
+            false
+    end.
+
+
+ensure_local_purge_docs(DbName, DDocs) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        lists:foreach(fun(DDoc) ->
+            try couch_mrview_util:ddoc_to_mrst(DbName, DDoc) of
+                {ok, MRSt} ->
+                    ensure_local_purge_doc(Db, MRSt)
+            catch _:_ ->
+                ok
+            end
+        end, DDocs)
+    end).
+
+
+ensure_local_purge_doc(Db, #mrst{}=State) ->
+    Sig = couch_index_util:hexsig(get(signature, State)),
+    DocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+    case couch_db:open_doc(Db, DocId, []) of
+        {not_found, _} ->
+            create_local_purge_doc(Db, State);
+        {ok, _} ->
+            ok
+    end.
+
+
+create_local_purge_doc(Db, State) ->
+    {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+    update_local_purge_doc(Db, State, PurgeSeq).
+
+
+update_local_purge_doc(Db, State) ->
+    update_local_purge_doc(Db, State, get(purge_seq, State)).
+
+
+update_local_purge_doc(Db, State, PSeq) ->
+    Sig = couch_index_util:hexsig(State#mrst.sig),
+    DocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+    {Mega, Secs, _} = os:timestamp(),
+    NowSecs = Mega * 1000000 + Secs,
+    BaseDoc = couch_doc:from_json_obj({[
+        {<<"_id">>, DocId},
+        {<<"type">>, <<"mrview">>},
+        {<<"purge_seq">>, PSeq},
+        {<<"updated_on">>, NowSecs},
+        {<<"verify_module">>, <<"couch_mrview_index">>},
+        {<<"verify_function">>, <<"verify_index_exists">>},
+        {<<"dbname">>, get(db_name, State)},
+        {<<"ddoc_id">>, get(idx_name, State)},
+        {<<"signature">>, Sig}
+    ]}),
+    Doc = case couch_db:open_doc(Db, DocId, []) of
+        {ok, #doc{revs = Revs}} ->
+            BaseDoc#doc{revs = Revs};
+        {not_found, _} ->
+            BaseDoc
+    end,
+    couch_db:update_doc(Db, Doc, []).
diff --git a/src/couch_mrview/src/couch_mrview_test_util.erl b/src/couch_mrview/src/couch_mrview_test_util.erl
index b07b076..35ab6c6 100644
--- a/src/couch_mrview/src/couch_mrview_test_util.erl
+++ b/src/couch_mrview/src/couch_mrview_test_util.erl
@@ -49,6 +49,11 @@ make_docs(local, Count) ->
 make_docs(_, Count) ->
     [doc(I) || I <- lists:seq(1, Count)].
 
+
+make_docs(_, Since, Count) ->
+    [doc(I) || I <- lists:seq(Since, Count)].
+        
+
 ddoc({changes, Opts}) ->
     ViewOpts = case Opts of
         seq_indexed ->
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 214f487..3383b49 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -12,15 +12,14 @@
 
 -module(couch_mrview_updater).
 
--export([start_update/3, purge/4, process_doc/3, finish_update/1]).
+-export([start_update/4, purge/4, process_doc/3, finish_update/1]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 -define(REM_VAL, removed).
 
-
-start_update(Partial, State, NumChanges) ->
+start_update(Partial, State, NumChanges, NumChangesDone) ->
     MaxSize = config:get_integer("view_updater", "queue_memory_cap", 100000),
     MaxItems = config:get_integer("view_updater", "queue_item_cap", 500),
     QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}],
@@ -36,14 +35,19 @@ start_update(Partial, State, NumChanges) ->
     },
 
     Self = self(),
+
     MapFun = fun() ->
+        Progress = case NumChanges of
+            0 -> 0;
+            _ -> (NumChangesDone * 100) div NumChanges
+        end,
         couch_task_status:add_task([
             {indexer_pid, ?l2b(pid_to_list(Partial))},
             {type, indexer},
             {database, State#mrst.db_name},
             {design_document, State#mrst.idx_name},
-            {progress, 0},
-            {changes_done, 0},
+            {progress, Progress},
+            {changes_done, NumChangesDone},
             {total_changes, NumChanges}
         ]),
         couch_task_status:set_update_frequency(500),
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 0c6e5fc..710cb28 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -13,6 +13,8 @@
 -module(couch_mrview_util).
 
 -export([get_view/4, get_view_index_pid/4]).
+-export([get_local_purge_doc_id/1, get_value_from_options/2]).
+-export([get_signature_from_filename/1]).
 -export([ddoc_to_mrst/2, init_state/4, reset_index/3]).
 -export([make_header/1]).
 -export([index_file/2, compaction_file/2, open_file/1]).
@@ -41,6 +43,27 @@
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 
+get_local_purge_doc_id(Sig) ->
+    Version = "v" ++ config:get("purge", "version", "1") ++ "-",
+    ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ Version ++ "mrview-" ++ Sig).
+
+
+get_value_from_options(Key, Options) ->
+    case couch_util:get_value(Key, Options) of
+        undefined ->
+            Reason = binary_to_list(Key) ++ " must exist in Options.",
+            throw({bad_request, Reason});
+        Value -> Value
+    end.
+
+
+get_signature_from_filename(FileName) ->
+    FilePathList = filename:split(FileName),
+    [PureFN] = lists:nthtail(length(FilePathList) - 1, FilePathList),
+    PureFNExt = filename:extension(PureFN),
+    filename:basename(PureFN, PureFNExt).
+
+
 get_view(Db, DDoc, ViewName, Args0) ->
     case get_view_index_state(Db, DDoc, ViewName, Args0) of
         {ok, State, Args2} ->
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
new file mode 100644
index 0000000..99bfa9e
--- /dev/null
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
@@ -0,0 +1,272 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_purge_docs_fabric_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+    DbName = ?tempdb(),
+    ok = fabric:create_db(DbName, [?ADMIN_CTX, {q, 1}]),
+    meck:new(couch_mrview_index, [passthrough]),
+    meck:expect(couch_mrview_index, ensure_local_purge_docs, fun(A, B) ->
+        meck:passthrough([A, B])
+    end),
+    DbName.
+
+
+teardown(DbName) ->
+    meck:unload(couch_mrview_index),
+    ok = fabric:delete_db(DbName, [?ADMIN_CTX]).
+
+
+view_purge_fabric_test_() ->
+    {
+        "Map views",
+        {
+            setup,
+            fun() -> test_util:start_couch([fabric, mem3]) end,
+            fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0,
+                fun teardown/1,
+                [
+                    fun test_purge_verify_index/1,
+                    fun test_purge_hook_before_compaction/1
+                ]
+            }
+        }
+    }.
+
+
+test_purge_verify_index(DbName) ->
+    ?_test(begin
+        Docs1 = couch_mrview_test_util:make_docs(normal, 5),
+        {ok, _} = fabric:update_docs(DbName, Docs1, [?ADMIN_CTX]),
+        {ok, _} = fabric:update_doc(
+            DbName,
+            couch_mrview_test_util:ddoc(map),
+            [?ADMIN_CTX]
+        ),
+
+        Result1 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+        Expect1 = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect1, Result1),
+
+        {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName),
+        ?assertEqual(0, couch_util:get_value(<<"purge_seq">>, Props1)),
+        ?assertEqual(true, couch_mrview_index:verify_index_exists(Props1)),
+
+        purge_docs(DbName, [<<"1">>]),
+
+        Result2 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+        Expect2 = {ok, [
+            {meta, [{total, 4}, {offset, 0}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect2, Result2),
+
+        {ok, #doc{body = {Props2}}} = get_local_purge_doc(DbName),
+        ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props2)),
+        ?assertEqual(true, couch_mrview_index:verify_index_exists(Props2))
+    end).
+
+
+test_purge_hook_before_compaction(DbName) ->
+    ?_test(begin
+        Docs1 = couch_mrview_test_util:make_docs(normal, 5),
+        {ok, _} = fabric:update_docs(DbName, Docs1, [?ADMIN_CTX]),
+        {ok, _} = fabric:update_doc(
+            DbName,
+            couch_mrview_test_util:ddoc(map),
+            [?ADMIN_CTX]
+        ),
+
+        Result1 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+        Expect1 = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect1, Result1),
+
+        purge_docs(DbName, [<<"1">>]),
+
+        Result2 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+        Expect2 = {ok, [
+            {meta, [{total, 4}, {offset, 0}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect2, Result2),
+
+        {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName),
+        ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props1)),
+
+        [ShardName | _] = local_shards(DbName),
+        couch_util:with_db(ShardName, fun(Db) ->
+            {ok, _} = couch_db:start_compact(Db)
+        end),
+        wait_compaction(ShardName, ?LINE),
+
+        ?assertEqual(ok, meck:wait(1, couch_mrview_index,
+            ensure_local_purge_docs, '_', 5000)
+        ),
+
+        % Make sure compaction didn't change the update seq
+        {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName),
+        ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props1)),
+
+        purge_docs(DbName, [<<"2">>]),
+
+        couch_util:with_db(ShardName, fun(Db) ->
+            {ok, _} = couch_db:start_compact(Db)
+        end),
+        wait_compaction(ShardName, ?LINE),
+
+        ?assertEqual(ok, meck:wait(2, couch_mrview_index,
+            ensure_local_purge_docs, '_', 5000)
+        ),
+
+        % Make sure compaction after a purge didn't overwrite
+        % the local purge doc for the index
+        {ok, #doc{body = {Props2}}} = get_local_purge_doc(DbName),
+        ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props2)),
+
+        % Force another update to ensure that we update
+        % the local doc appropriate after compaction
+        Result3 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+        Expect3 = {ok, [
+            {meta, [{total, 3}, {offset, 0}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect3, Result3),
+
+        {ok, #doc{body = {Props3}}} = get_local_purge_doc(DbName),
+        ?assertEqual(2, couch_util:get_value(<<"purge_seq">>, Props3)),
+
+        % Check that if the local doc doesn't exist that one
+        % is created for the index on compaction
+        delete_local_purge_doc(DbName),
+        ?assertMatch({not_found, _}, get_local_purge_doc(DbName)),
+
+        couch_util:with_db(ShardName, fun(Db) ->
+            {ok, _} = couch_db:start_compact(Db)
+        end),
+        wait_compaction(ShardName, ?LINE),
+
+        ?assertEqual(ok, meck:wait(3, couch_mrview_index,
+            ensure_local_purge_docs, '_', 5000)
+        ),
+
+        {ok, #doc{body = {Props4}}} = get_local_purge_doc(DbName),
+        ?assertEqual(2, couch_util:get_value(<<"purge_seq">>, Props4))
+    end).
+
+
+get_local_purge_doc(DbName) ->
+    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/bar">>, []),
+    {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc),
+    Sig = IdxState#mrst.sig,
+    HexSig = list_to_binary(couch_index_util:hexsig(Sig)),
+    DocId = couch_mrview_util:get_local_purge_doc_id(HexSig),
+    [ShardName | _] = local_shards(DbName),
+    couch_util:with_db(ShardName, fun(Db) ->
+        couch_db:open_doc(Db, DocId, [])
+    end).
+
+
+delete_local_purge_doc(DbName) ->
+    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/bar">>, []),
+    {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc),
+    Sig = IdxState#mrst.sig,
+    HexSig = list_to_binary(couch_index_util:hexsig(Sig)),
+    DocId = couch_mrview_util:get_local_purge_doc_id(HexSig),
+    NewDoc = #doc{id = DocId, deleted = true},
+    [ShardName | _] = local_shards(DbName),
+    couch_util:with_db(ShardName, fun(Db) ->
+        {ok, _} = couch_db:update_doc(Db, NewDoc, [])
+    end).
+
+
+get_rev(#full_doc_info{} = FDI) ->
+    #doc_info{
+        revs = [#rev_info{} = PrevRev | _]
+    } = couch_doc:to_doc_info(FDI),
+    PrevRev#rev_info.rev.
+
+
+purge_docs(DbName, DocIds) ->
+    lists:foreach(fun(DocId) ->
+        FDI = fabric:get_full_doc_info(DbName, DocId, []),
+        Rev = get_rev(FDI),
+        {ok, [{ok, _}]} = fabric:purge_docs(DbName, [{DocId, [Rev]}], [])
+    end, DocIds).
+
+
+wait_compaction(DbName, Line) ->
+    WaitFun = fun() ->
+        case is_compaction_running(DbName) of
+            true -> wait;
+            false -> ok
+        end
+    end,
+    case test_util:wait(WaitFun, 10000) of
+        timeout ->
+            erlang:error({assertion_failed, [
+                    {module, ?MODULE},
+                    {line, Line},
+                    {reason, "Timeout waiting for database compaction"}
+                ]});
+        _ ->
+            ok
+    end.
+
+
+is_compaction_running(DbName) ->
+    {ok, DbInfo} = couch_util:with_db(DbName, fun(Db) ->
+        couch_db:get_db_info(Db)
+    end),
+    couch_util:get_value(compact_running, DbInfo).
+
+
+local_shards(DbName) ->
+    try
+        [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+    catch
+        error:database_does_not_exist ->
+            []
+    end.
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
new file mode 100644
index 0000000..d5bf9f6
--- /dev/null
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
@@ -0,0 +1,495 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_purge_docs_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+    meck:new(couch_index_updater, [passthrough]),
+    {ok, Db} = couch_mrview_test_util:init_db(?tempdb(), map, 5),
+    Db.
+
+teardown(Db) ->
+    couch_db:close(Db),
+    couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
+    meck:unload(),
+    ok.
+
+view_purge_test_() ->
+    {
+        "Map views",
+        {
+            setup,
+            fun test_util:start_couch/0,
+            fun test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0,
+                fun teardown/1,
+                [
+                    fun test_purge_single/1,
+                    fun test_purge_partial/1,
+                    fun test_purge_complete/1,
+                    fun test_purge_nochange/1,
+                    fun test_purge_index_reset/1,
+                    fun test_purge_compact_size_check/1,
+                    fun test_purge_compact_for_stale_purge_cp_without_client/1,
+                    fun test_purge_compact_for_stale_purge_cp_with_client/1
+                ]
+            }
+        }
+    }.
+
+
+test_purge_single(Db) ->
+    ?_test(begin
+        Result = run_query(Db, []),
+        Expect = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect, Result),
+
+        FDI = couch_db:get_full_doc_info(Db, <<"1">>),
+        Rev = get_rev(FDI),
+        {ok, [{ok, _PRevs}]} = couch_db:purge_docs(
+                Db,
+                [{<<"UUID1">>, <<"1">>, [Rev]}]
+            ),
+        {ok, Db2} = couch_db:reopen(Db),
+
+        Result2 = run_query(Db2, []),
+        Expect2 = {ok, [
+            {meta, [{total, 4}, {offset, 0}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect2, Result2)
+    end).
+
+
+test_purge_partial(Db) ->
+    ?_test(begin
+        Result = run_query(Db, []),
+        Expect = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect, Result),
+
+        FDI1 = couch_db:get_full_doc_info(Db, <<"1">>), Rev1 = get_rev(FDI1),
+        Update = {[
+            {'_id', <<"1">>},
+            {'_rev', couch_doc:rev_to_str({1, [crypto:hash(md5, <<"1.2">>)]})},
+            {'val', 1.2}
+        ]},
+        {ok, [_Rev2]} = save_docs(Db, [Update], [replicated_changes]),
+
+        PurgeInfos = [{<<"UUID1">>, <<"1">>, [Rev1]}],
+
+        {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+        {ok, Db2} = couch_db:reopen(Db),
+
+        Result2 = run_query(Db2, []),
+        Expect2 = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1.2}, {value, 1.2}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect2, Result2)
+    end).
+
+
+test_purge_complete(Db) ->
+    ?_test(begin
+        Result = run_query(Db, []),
+        Expect = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect, Result),
+
+        FDI1 = couch_db:get_full_doc_info(Db, <<"1">>), Rev1 = get_rev(FDI1),
+        FDI2 = couch_db:get_full_doc_info(Db, <<"2">>), Rev2 = get_rev(FDI2),
+        FDI5 = couch_db:get_full_doc_info(Db, <<"5">>), Rev5 = get_rev(FDI5),
+
+        PurgeInfos = [
+            {<<"UUID1">>, <<"1">>, [Rev1]},
+            {<<"UUID2">>, <<"2">>, [Rev2]},
+            {<<"UUID5">>, <<"5">>, [Rev5]}
+        ],
+        {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+        {ok, Db2} = couch_db:reopen(Db),
+
+        Result2 = run_query(Db2, []),
+        Expect2 = {ok, [
+            {meta, [{total, 2}, {offset, 0}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}
+        ]},
+        ?assertEqual(Expect2, Result2)
+    end).
+
+
+test_purge_nochange(Db) ->
+    ?_test(begin
+        Result = run_query(Db, []),
+        Expect = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect, Result),
+
+        FDI1 = couch_db:get_full_doc_info(Db, <<"1">>),
+        Rev1 = get_rev(FDI1),
+
+        PurgeInfos = [
+            {<<"UUID1">>, <<"6">>, [Rev1]}
+        ],
+        {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+        {ok, Db2} = couch_db:reopen(Db),
+
+        Result2 = run_query(Db2, []),
+        Expect2 = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect2, Result2)
+    end).
+
+
+test_purge_index_reset(Db) ->
+    ?_test(begin
+        ok = couch_db:set_purge_infos_limit(Db, 2),
+        {ok, Db1} = couch_db:reopen(Db),
+
+        Result = run_query(Db1, []),
+        Expect = {ok, [
+            {meta, [{total, 5}, {offset, 0}]},
+            {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+            {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+            {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+            {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+            {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+        ]},
+        ?assertEqual(Expect, Result),
+
+        PurgeInfos = lists:map(fun(I) ->
+            DocId = list_to_binary(integer_to_list(I)),
+            FDI = couch_db:get_full_doc_info(Db, DocId),
+            Rev = get_rev(FDI),
+            {couch_uuids:random(), DocId, [Rev]}
+        end, lists:seq(1, 5)),
+        {ok, _} = couch_db:purge_docs(Db1, PurgeInfos),
+
+        {ok, Db2} = couch_db:reopen(Db1),
+
+        % Forcibly set the purge doc to a newer purge
+        % sequence to force an index reset. This should
+        % never happen in real life but the reset
+        % is required for correctness.
+        {ok, #doc{body = {OldProps}} = LocalDoc} = get_local_purge_doc(Db2),
+        NewPurgeSeq = {<<"purge_seq">>, 5},
+        NewProps = lists:keyreplace(<<"purge_seq">>, 1, OldProps, NewPurgeSeq),
+        RewindDoc = LocalDoc#doc{body = {NewProps}},
+        {ok, _} = couch_db:update_doc(Db2, RewindDoc, []),
+
+        % Compact the database to remove purge infos
+        {ok, _} = couch_db:start_compact(Db2),
+        wait_compaction(couch_db:name(Db), "database", ?LINE),
+
+        {ok, Db3} = couch_db:reopen(Db2),
+        Result2 = run_query(Db3, []),
+        Expect2 = {ok, [
+            {meta, [{total, 0}, {offset, 0}]}
+        ]},
+        ?assertEqual(Expect2, Result2),
+
+        % Assert that we had a reset
+        meck:wait(
+                1,
+                couch_index_updater,
+                handle_info,
+                [{'EXIT', '_', {reset, '_'}}, '_'],
+                5000
+            )
+    end).
+
+
+test_purge_compact_size_check(Db) ->
+    ?_test(begin
+        DbName = couch_db:name(Db),
+        Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+        {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+        _Result = run_query(Db1, []),
+        DiskSizeBefore = db_disk_size(DbName),
+
+        PurgedDocsNum = 150,
+        IdsRevs = lists:foldl(fun(Id, CIdRevs) ->
+            Id1 = docid(Id),
+            FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+            Rev1 = get_rev(FDI1),
+            UUID1 = uuid(Id),
+            [{UUID1, Id1, [Rev1]} | CIdRevs]
+        end, [], lists:seq(1, PurgedDocsNum)),
+        {ok, _} = couch_db:purge_docs(Db1, IdsRevs),
+
+        {ok, Db2} = couch_db:reopen(Db1),
+        _Result1 = run_query(Db2, []),
+        {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+                Db2,
+                0,
+                fun fold_fun/2,
+                [],
+                []
+            ),
+        ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+        config:set("couchdb", "file_compression", "snappy", false),
+
+        {ok, Db3} = couch_db:open_int(DbName, []),
+        {ok, _CompactPid} = couch_db:start_compact(Db3),
+        wait_compaction(DbName, "database", ?LINE),
+        ok = couch_db:close(Db3),
+        DiskSizeAfter = db_disk_size(DbName),
+        ?assert(DiskSizeBefore > DiskSizeAfter)
+    end).
+
+
+test_purge_compact_for_stale_purge_cp_without_client(Db) ->
+    ?_test(begin
+        DbName = couch_db:name(Db),
+        % add more documents to database for purge
+        Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+        {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+
+        % change PurgedDocsLimit to 10 from 1000 to
+        % avoid timeout of eunit test
+        PurgedDocsLimit = 10,
+        couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit),
+
+        % purge 150 documents
+        PurgedDocsNum = 150,
+        PurgeInfos = lists:foldl(fun(Id, CIdRevs) ->
+            Id1 = docid(Id),
+            FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+            Rev1 = get_rev(FDI1),
+            UUID1 = uuid(Id),
+            [{UUID1, Id1, [Rev1]} | CIdRevs]
+        end, [], lists:seq(1, PurgedDocsNum)),
+        {ok, _} = couch_db:purge_docs(Db1, PurgeInfos),
+
+        {ok, Db2} = couch_db:reopen(Db1),
+        {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+                Db2,
+                0,
+                fun fold_fun/2,
+                [],
+                []
+            ),
+        ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+
+        % run compaction to trigger pruning of purge tree
+        {ok, Db3} = couch_db:open_int(DbName, []),
+        {ok, _CompactPid} = couch_db:start_compact(Db3),
+        wait_compaction(DbName, "database", ?LINE),
+        ok = couch_db:close(Db3),
+
+        % check the remaining purge requests in purge tree
+        {ok, Db4} = couch_db:reopen(Db3),
+        {ok, OldestPSeq} = couch_db:get_oldest_purge_seq(Db4),
+        {ok, PurgedIdRevs2} = couch_db:fold_purge_infos(
+                Db4,
+                OldestPSeq - 1,
+                fun fold_fun/2,
+                [],
+                []
+            ),
+        ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2))
+    end).
+
+
+test_purge_compact_for_stale_purge_cp_with_client(Db) ->
+    ?_test(begin
+        DbName = couch_db:name(Db),
+        % add more documents to database for purge
+        Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+        {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+
+        % change PurgedDocsLimit to 10 from 1000 to
+        % avoid timeout of eunit test
+        PurgedDocsLimit = 10,
+        couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit),
+        _Result = run_query(Db1, []),
+
+        % purge 150 documents
+        PurgedDocsNum = 150,
+        IdsRevs = lists:foldl(fun(Id, CIdRevs) ->
+            Id1 = docid(Id),
+            FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+            Rev1 = get_rev(FDI1),
+            UUID1 = uuid(Id),
+            [{UUID1, Id1, [Rev1]} | CIdRevs]
+        end, [], lists:seq(1, PurgedDocsNum)),
+        {ok, _} = couch_db:purge_docs(Db1, IdsRevs),
+
+        % run query again to reflect purge requests
+        % to mrview
+        {ok, Db2} = couch_db:reopen(Db1),
+        _Result1 = run_query(Db2, []),
+        {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+                Db2,
+                0,
+                fun fold_fun/2,
+                [],
+                []
+            ),
+        ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+
+        % run compaction to trigger pruning of purge tree
+        {ok, Db3} = couch_db:open_int(DbName, []),
+        {ok, _CompactPid} = couch_db:start_compact(Db3),
+        wait_compaction(DbName, "database", ?LINE),
+        ok = couch_db:close(Db3),
+
+        % check the remaining purge requests in purge tree
+        {ok, Db4} = couch_db:reopen(Db3),
+        {ok, OldestPSeq} = couch_db:get_oldest_purge_seq(Db4),
+        {ok, PurgedIdRevs2} = couch_db:fold_purge_infos(
+                Db4,
+                OldestPSeq - 1,
+                fun fold_fun/2,
+                [],
+                []
+            ),
+        ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2))
+    end).
+
+
+get_local_purge_doc(Db) ->
+    {ok, DDoc} = couch_db:open_doc(Db, <<"_design/bar">>, []),
+    {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc),
+    Sig = IdxState#mrst.sig,
+    HexSig = list_to_binary(couch_index_util:hexsig(Sig)),
+    DocId = couch_mrview_util:get_local_purge_doc_id(HexSig),
+    couch_db:open_doc(Db, DocId, []).
+
+
+run_query(Db, Opts) ->
+    couch_mrview:query_view(Db, <<"_design/bar">>, <<"baz">>, Opts).
+
+
+save_docs(Db, JsonDocs, Options) ->
+    Docs = lists:map(fun(JDoc) ->
+        couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE(JDoc)))
+                     end, JsonDocs),
+    Opts = [full_commit | Options],
+    case lists:member(replicated_changes, Options) of
+        true ->
+            {ok, []} = couch_db:update_docs(
+                Db, Docs, Opts, replicated_changes),
+            {ok, lists:map(fun(Doc) ->
+                {Pos, [RevId | _]} = Doc#doc.revs,
+                {Pos, RevId}
+                           end, Docs)};
+        false ->
+            {ok, Resp} = couch_db:update_docs(Db, Docs, Opts),
+            {ok, [Rev || {ok, Rev} <- Resp]}
+    end.
+
+
+get_rev(#full_doc_info{} = FDI) ->
+    #doc_info{
+        revs = [#rev_info{} = PrevRev | _]
+    } = couch_doc:to_doc_info(FDI),
+    PrevRev#rev_info.rev.
+
+
+db_disk_size(DbName) ->
+    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, Info} = couch_db:get_db_info(Db),
+    ok = couch_db:close(Db),
+    active_size(Info).
+
+
+active_size(Info) ->
+    couch_util:get_nested_json_value({Info}, [sizes, active]).
+
+
+wait_compaction(DbName, Kind, Line) ->
+    WaitFun = fun() ->
+        case is_compaction_running(DbName) of
+            true -> wait;
+            false -> ok
+        end
+    end,
+    case test_util:wait(WaitFun, 10000) of
+        timeout ->
+            erlang:error({assertion_failed,
+                [{module, ?MODULE},
+                    {line, Line},
+                    {reason, "Timeout waiting for "
+                        ++ Kind
+                        ++ " database compaction"}]});
+        _ ->
+            ok
+    end.
+
+
+is_compaction_running(DbName) ->
+    {ok, Db} = couch_db:open_int(DbName, []),
+    {ok, DbInfo} = couch_db:get_db_info(Db),
+    couch_db:close(Db),
+    couch_util:get_value(compact_running, DbInfo).
+
+
+fold_fun({_PSeq, _UUID, Id, Revs}, Acc) ->
+    {ok, [{Id, Revs} | Acc]}.
+
+
+docid(I) ->
+    list_to_binary(integer_to_list(I)).
+
+
+uuid(I) ->
+    Str = io_lib:format("UUID~4..0b", [I]),
+    iolist_to_binary(Str).

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.