You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ji...@apache.org on 2018/06/22 17:20:23 UTC

[couchdb] branch COUCHDB-3326-clustered-purge-pr5-implementation updated (f526511 -> a5c1d13)

This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a change to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


    omit f526511  Address Ilya's comments
     new a5c1d13  Address Ilya's comments

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f526511)
            \
             N -- N -- N   refs/heads/COUCHDB-3326-clustered-purge-pr5-implementation (a5c1d13)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch/src/couch_db.erl                  |  4 ++--
 src/couch_mrview/src/couch_mrview_index.erl | 33 ++++++++++++++++++++++-------
 2 files changed, 27 insertions(+), 10 deletions(-)


[couchdb] 01/01: Address Ilya's comments

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a5c1d1330c8051a9912871049ead60869745c4db
Author: jiangphcn <ji...@cn.ibm.com>
AuthorDate: Thu Jun 21 16:10:06 2018 +0800

    Address Ilya's comments
    
       - Use couch_epi:any/5 to call index verification func
       - Fixed issue in get_design_docs/1 where suffix of database
         needs to be stripped
       - swap "stop_couch/1" and "remove files"
       - refactor reason for throw bad_request
       - directly use meck:unload/0
    
    COUCHDB-3326
---
 src/couch/src/couch_db.erl                         | 39 ++----------
 src/couch/src/couch_db_plugin.erl                  |  6 ++
 src/couch/test/couch_bt_engine_upgrade_tests.erl   |  4 +-
 .../src/couch_index_plugin_couch_db.erl            |  5 ++
 src/couch_mrview/src/couch_mrview_index.erl        | 72 +++++++++++-----------
 src/couch_mrview/src/couch_mrview_util.erl         |  5 +-
 .../test/couch_mrview_purge_docs_fabric_tests.erl  |  2 +-
 .../test/couch_mrview_purge_docs_tests.erl         | 25 +++++---
 .../src/cpse_test_purge_bad_checkpoints.erl        | 51 ++-------------
 src/mem3/src/mem3_epi.erl                          |  3 +-
 .../src/mem3_plugin_couch_db.erl}                  |  8 +--
 src/mem3/src/mem3_rep.erl                          | 49 ++++++++-------
 12 files changed, 111 insertions(+), 158 deletions(-)

diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 2645ea2..9d9d164 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -81,7 +81,6 @@
 
     get_minimum_purge_seq/1,
     purge_client_exists/3,
-    get_purge_client_fun/2,
 
     update_doc/3,
     update_doc/4,
@@ -475,8 +474,7 @@ purge_client_exists(DbName, DocId, Props) ->
     LagThreshold = NowSecs - LagWindow,
 
     try
-        CheckFun = get_purge_client_fun(DocId, Props),
-        Exists = CheckFun(Props),
+        Exists = couch_db_plugin:is_valid_purge_client(Props),
         if not Exists -> ok; true ->
             Updated = couch_util:get_value(<<"updated_on">>, Props),
             if is_integer(Updated) and Updated > LagThreshold -> ok; true ->
@@ -490,41 +488,12 @@ purge_client_exists(DbName, DocId, Props) ->
         % If we fail to check for a client we have to assume that
         % it exists.
         Fmt2 = "Failed to check purge checkpoint using
-            document '~p' on database ~p",
+            document '~p' in database ~p",
         couch_log:error(Fmt2, [DbName, DocId]),
         true
     end.
 
 
-get_purge_client_fun(DocId, Props) ->
-    M0 = couch_util:get_value(<<"verify_module">>, Props),
-    M = try
-        binary_to_existing_atom(M0, latin1)
-    catch error:badarg ->
-        Fmt1 = "Missing index module '~p' for purge checkpoint '~s'",
-        couch_log:error(Fmt1, [M0, DocId]),
-        throw(failed)
-    end,
-
-    F0 = couch_util:get_value(<<"verify_function">>, Props),
-    try
-        F = binary_to_existing_atom(F0, latin1),
-        case erlang:function_exported(M, F, 1) of
-            true ->
-                fun M:F/1;
-            false ->
-                Fmt2 = "Missing exported function '~p' in '~p'
-                    for purge checkpoint '~s'",
-                couch_log:error(Fmt2, [F0, M0, DocId]),
-                throw(failed)
-        end
-    catch error:badarg ->
-        Fmt3 = "Missing function '~p' in '~p' for purge checkpoint '~s'",
-        couch_log:error(Fmt3, [F0, M0, DocId]),
-        throw(failed)
-    end.
-
-
 set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
     check_is_admin(Db),
     gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity);
@@ -634,7 +603,8 @@ get_db_info(Db) ->
     ],
     {ok, InfoList}.
 
-get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
+get_design_docs(#db{name = <<"shards/", _:18/binary, DbFullName/binary>>}) ->
+    DbName = ?l2b(filename:rootname(filename:basename(?b2l(DbFullName)))),
     {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
     receive {'DOWN', Ref, _, _, Response} ->
         Response
@@ -644,7 +614,6 @@ get_design_docs(#db{} = Db) ->
     {ok, Docs} = fold_design_docs(Db, FoldFun, [], []),
     {ok, lists:reverse(Docs)}.
 
-
 check_is_admin(#db{user_ctx=UserCtx}=Db) ->
     case is_admin(Db) of
         true -> ok;
diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl
index 8163256..c0bcc2f 100644
--- a/src/couch/src/couch_db_plugin.erl
+++ b/src/couch/src/couch_db_plugin.erl
@@ -18,6 +18,7 @@
     after_doc_read/2,
     validate_docid/1,
     check_is_admin/1,
+    is_valid_purge_client/1,
     on_compact/2,
     on_delete/2
 ]).
@@ -57,6 +58,11 @@ check_is_admin(Db) ->
     %% callbacks return true only if it specifically allow the given Id
     couch_epi:any(Handle, ?SERVICE_ID, check_is_admin, [Db], []).
 
+is_valid_purge_client(Props) ->
+    Handle = couch_epi:get_handle(?SERVICE_ID),
+    %% callbacks return true only if it specifically allow the given Id
+    couch_epi:any(Handle, ?SERVICE_ID, is_valid_purge_client, [Props], []).
+
 on_compact(DbName, DDocs) ->
     Handle = couch_epi:get_handle(?SERVICE_ID),
     couch_epi:apply(Handle, ?SERVICE_ID, on_compact, [DbName, DDocs], []).
diff --git a/src/couch/test/couch_bt_engine_upgrade_tests.erl b/src/couch/test/couch_bt_engine_upgrade_tests.erl
index 6aef366..8c748f8 100644
--- a/src/couch/test/couch_bt_engine_upgrade_tests.erl
+++ b/src/couch/test/couch_bt_engine_upgrade_tests.erl
@@ -37,10 +37,10 @@ setup() ->
 
 
 teardown({Ctx, Paths}) ->
+    test_util:stop_couch(Ctx),
     lists:foreach(fun(Path) ->
         file:delete(Path)
-    end, Paths),
-    test_util:stop_couch(Ctx).
+    end, Paths).
 
 
 upgrade_test_() ->
diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/couch_index/src/couch_index_plugin_couch_db.erl
index 937f7c8..5d4a6ac 100644
--- a/src/couch_index/src/couch_index_plugin_couch_db.erl
+++ b/src/couch_index/src/couch_index_plugin_couch_db.erl
@@ -13,9 +13,14 @@
 -module(couch_index_plugin_couch_db).
 
 -export([
+    is_valid_purge_client/1,
     on_compact/2
 ]).
 
 
+is_valid_purge_client(Props) ->
+    couch_mrview_index:verify_index_exists(Props).
+
+
 on_compact(DbName, DDocs) ->
     couch_mrview_index:ensure_local_purge_docs(DbName, DDocs).
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index 7756f52..d6558e1 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -220,48 +220,50 @@ index_file_exists(State) ->
 
 
 verify_index_exists(Props) ->
-    ShardDbName = couch_mrview_util:get_value_from_options(
-        <<"dbname">>,
-        Props
-    ),
-    DDocId = couch_mrview_util:get_value_from_options(
-        <<"ddoc_id">>,
-        Props
-    ),
-    SigInLocal = couch_mrview_util:get_value_from_options(
-        <<"signature">>,
-        Props
-    ),
-    case couch_db:open_int(ShardDbName, []) of
-        {ok, Db} ->
-            try
-                DbName = mem3:dbname(couch_db:name(Db)),
-                case ddoc_cache:open(DbName, DDocId) of
-                    {ok, DDoc} ->
+    try
+        Type = couch_util:get_value(<<"type">>, Props),
+        if Type =/= <<"mrview">> -> false; true ->
+            DbName = couch_util:get_value(<<"dbname">>, Props),
+            DDocId = couch_util:get_value(<<"ddoc_id">>, Props),
+            couch_util:with_db(DbName, fun(Db) ->
+                {ok, DesignDocs} = couch_db:get_design_docs(Db),
+                case get_ddoc(DbName, DesignDocs, DDocId) of
+                    #doc{} = DDoc ->
                         {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(
-                            ShardDbName,
-                            DDoc
-                        ),
+                            DbName, DDoc),
                         IdxSig = IdxState#mrst.sig,
+                        SigInLocal = couch_util:get_value(
+                            <<"signature">>, Props),
                         couch_index_util:hexsig(IdxSig) == SigInLocal;
-                    _Else ->
+                    not_found ->
                         false
                 end
-            catch E:T ->
-                Stack = erlang:get_stacktrace(),
-                couch_log:error(
-                    "Error occurs when verifying existence of ~s/~s :: ~p ~p",
-                    [ShardDbName, DDocId, {E, T}, Stack]
-                ),
-                false
-            after
-                catch couch_db:close(Db)
-            end;
-        _ ->
-            false
+            end)
+        end
+    catch _:_ ->
+        false
     end.
 
 
+get_ddoc(<<"shards/", _/binary>> = _DbName, DesignDocs, DDocId) ->
+    DDocs = [couch_doc:from_json_obj(DD) || DD <- DesignDocs],
+    case lists:keyfind(DDocId, #doc.id, DDocs) of
+        #doc{} = DDoc -> DDoc;
+        false -> not_found
+    end;
+get_ddoc(DbName, DesignDocs, DDocId) ->
+    couch_util:with_db(DbName, fun(Db) ->
+        case lists:keyfind(DDocId, #full_doc_info.id, DesignDocs) of
+            #full_doc_info{} = DDocInfo ->
+                {ok, DDoc} = couch_db:open_doc_int(
+                    Db, DDocInfo, [ejson_body]),
+                    DDoc;
+            false ->
+                not_found
+        end
+    end).
+
+
 ensure_local_purge_docs(DbName, DDocs) ->
     couch_util:with_db(DbName, fun(Db) ->
         lists:foreach(fun(DDoc) ->
@@ -305,8 +307,6 @@ update_local_purge_doc(Db, State, PSeq) ->
         {<<"type">>, <<"mrview">>},
         {<<"purge_seq">>, PSeq},
         {<<"updated_on">>, NowSecs},
-        {<<"verify_module">>, <<"couch_mrview_index">>},
-        {<<"verify_function">>, <<"verify_index_exists">>},
         {<<"dbname">>, get(db_name, State)},
         {<<"ddoc_id">>, get(idx_name, State)},
         {<<"signature">>, Sig}
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index b274961..a9ae661 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -44,14 +44,13 @@
 
 
 get_local_purge_doc_id(Sig) ->
-    Version = "v" ++ config:get("purge", "version", "1") ++ "-",
-    ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Version ++ Sig).
+    ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Sig).
 
 
 get_value_from_options(Key, Options) ->
     case couch_util:get_value(Key, Options) of
         undefined ->
-            Reason = binary_to_list(Key) ++ " must exist in Options.",
+            Reason = <<"'", Key/binary, "' must exists in options.">>,
             throw({bad_request, Reason});
         Value -> Value
     end.
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
index 99bfa9e..6b0d4d9 100644
--- a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
@@ -31,7 +31,7 @@ setup() ->
 
 
 teardown(DbName) ->
-    meck:unload(couch_mrview_index),
+    meck:unload(),
     ok = fabric:delete_db(DbName, [?ADMIN_CTX]).
 
 
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
index d5bf9f6..a7fb34f 100644
--- a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
@@ -361,20 +361,19 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) ->
         couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit),
         _Result = run_query(Db1, []),
 
-        % purge 150 documents
-        PurgedDocsNum = 150,
+        % first purge 30 documents
+        PurgedDocsNum1 = 30,
         IdsRevs = lists:foldl(fun(Id, CIdRevs) ->
             Id1 = docid(Id),
             FDI1 = couch_db:get_full_doc_info(Db1, Id1),
             Rev1 = get_rev(FDI1),
             UUID1 = uuid(Id),
             [{UUID1, Id1, [Rev1]} | CIdRevs]
-        end, [], lists:seq(1, PurgedDocsNum)),
+        end, [], lists:seq(1, PurgedDocsNum1)),
         {ok, _} = couch_db:purge_docs(Db1, IdsRevs),
 
-        % run query again to reflect purge requests
-        % to mrview
         {ok, Db2} = couch_db:reopen(Db1),
+        % run query again to reflect purge request to mrview
         _Result1 = run_query(Db2, []),
         {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
                 Db2,
@@ -383,9 +382,21 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) ->
                 [],
                 []
             ),
-        ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+        ?assertEqual(PurgedDocsNum1, length(PurgedIdRevs)),
+
+        % then purge 120 documents
+        PurgedDocsNum2 = 150,
+        IdsRevs2 = lists:foldl(fun(Id, CIdRevs) ->
+            Id1 = docid(Id),
+            FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+            Rev1 = get_rev(FDI1),
+            UUID1 = uuid(Id),
+            [{UUID1, Id1, [Rev1]} | CIdRevs]
+        end, [], lists:seq(PurgedDocsNum1 + 1, PurgedDocsNum2)),
+        {ok, _} = couch_db:purge_docs(Db2, IdsRevs2),
 
         % run compaction to trigger pruning of purge tree
+        % only the first 30 purge requests are pruned
         {ok, Db3} = couch_db:open_int(DbName, []),
         {ok, _CompactPid} = couch_db:start_compact(Db3),
         wait_compaction(DbName, "database", ?LINE),
@@ -401,7 +412,7 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) ->
                 [],
                 []
             ),
-        ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2))
+        ?assertEqual(PurgedDocsNum2 - PurgedDocsNum1, length(PurgedIdRevs2))
     end).
 
 
diff --git a/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl
index 3d5edb1..c7a85c7 100644
--- a/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl
+++ b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl
@@ -48,7 +48,7 @@ teardown_each(Db) ->
 
 
 cpse_bad_purge_seq(Db1) ->
-    Db2 = save_local_doc(Db1, <<"foo">>, ?MODULE, valid_fun),
+    Db2 = save_local_doc(Db1, <<"foo">>),
     ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
 
     ok = couch_db:set_purge_infos_limit(Db2, 5),
@@ -56,66 +56,25 @@ cpse_bad_purge_seq(Db1) ->
     ?assertEqual(1, couch_db:get_minimum_purge_seq(Db3)).
 
 
-cpse_bad_verify_mod(Db1) ->
-    Db2 = save_local_doc(Db1, 2, [invalid_module], valid_fun),
-    ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
-
-    ok = couch_db:set_purge_infos_limit(Db2, 5),
-    {ok, Db3} = couch_db:reopen(Db2),
-    ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
-
-
-cpse_bad_verify_fun(Db1) ->
-    Db2 = save_local_doc(Db1, 2, ?MODULE, [invalid_function]),
-    ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
-
-    ok = couch_db:set_purge_infos_limit(Db2, 5),
-    {ok, Db3} = couch_db:reopen(Db2),
-    ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
-
-
-cpse_verify_fun_throws(Db1) ->
-    Db2 = save_local_doc(Db1, 2, ?MODULE, throw_fun),
-    ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
-
-    ok = couch_db:set_purge_infos_limit(Db2, 5),
-    {ok, Db3} = couch_db:reopen(Db2),
-    ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
-
-
 cpse_verify_non_boolean(Db1) ->
-    Db2 = save_local_doc(Db1, 2, ?MODULE, non_bool_fun),
+    Db2 = save_local_doc(Db1, 2),
     ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
 
     ok = couch_db:set_purge_infos_limit(Db2, 5),
     {ok, Db3} = couch_db:reopen(Db2),
-    ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
+    ?assertEqual(5, couch_db:get_minimum_purge_seq(Db3)).
 
 
-save_local_doc(Db1, PurgeSeq, Mod, Fun) ->
+save_local_doc(Db1, PurgeSeq) ->
     {Mega, Secs, _} = os:timestamp(),
     NowSecs = Mega * 1000000 + Secs,
     Doc = couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE({[
-        {<<"_id">>, <<"_local/purge-test-stuff-v1">>},
+        {<<"_id">>, <<"_local/purge-test-stuff">>},
         {<<"purge_seq">>, PurgeSeq},
         {<<"timestamp_utc">>, NowSecs},
-        {<<"verify_module">>, Mod},
-        {<<"verify_function">>, Fun},
         {<<"verify_options">>, {[{<<"signature">>, <<"stuff">>}]}},
         {<<"type">>, <<"test">>}
     ]}))),
     {ok, _} = couch_db:update_doc(Db1, Doc, []),
     {ok, Db2} = couch_db:reopen(Db1),
     Db2.
-
-
-valid_fun(_Options) ->
-    true.
-
-
-throw_fun(_Options) ->
-    throw(failed).
-
-
-not_bool(_Options) ->
-    ok.
diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_epi.erl
index ebcd596..4bf2bf5 100644
--- a/src/mem3/src/mem3_epi.erl
+++ b/src/mem3/src/mem3_epi.erl
@@ -30,7 +30,8 @@ app() ->
 
 providers() ->
     [
-         {chttpd_handlers, mem3_httpd_handlers}
+        {couch_db, mem3_plugin_couch_db},
+        {chttpd_handlers, mem3_httpd_handlers}
     ].
 
 
diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/mem3/src/mem3_plugin_couch_db.erl
similarity index 79%
copy from src/couch_index/src/couch_index_plugin_couch_db.erl
copy to src/mem3/src/mem3_plugin_couch_db.erl
index 937f7c8..f19f5eb 100644
--- a/src/couch_index/src/couch_index_plugin_couch_db.erl
+++ b/src/mem3/src/mem3_plugin_couch_db.erl
@@ -10,12 +10,12 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
--module(couch_index_plugin_couch_db).
+-module(mem3_plugin_couch_db).
 
 -export([
-    on_compact/2
+    is_valid_purge_client/1
 ]).
 
 
-on_compact(DbName, DDocs) ->
-    couch_mrview_index:ensure_local_purge_docs(DbName, DDocs).
+is_valid_purge_client(Props) ->
+    mem3_rep:verify_purge_checkpoint(Props).
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 3f224cd..22a3f7a 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -122,32 +122,37 @@ make_local_id(SourceThing, TargetThing, Filter) ->
 
 
 make_purge_id(SourceUUID, TargetUUID) ->
-    Version = "v" ++ config:get("purge", "version", "1") ++ "-",
-    ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mem3-" ++ Version ++
-        ?b2l(SourceUUID) ++ "-" ++ ?b2l(TargetUUID)).
+    <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
 
 
 verify_purge_checkpoint(Props) ->
-    DbName = couch_util:get_value(<<"dbname">>, Props),
-    SourceBin = couch_util:get_value(<<"source">>, Props),
-    TargetBin = couch_util:get_value(<<"target">>, Props),
-    Range = couch_util:get_value(<<"range">>, Props),
-
-    Source = binary_to_existing_atom(SourceBin, latin1),
-    Target = binary_to_existing_atom(TargetBin, latin1),
-
     try
-        Shards = mem3:shards(DbName),
-        Nodes = lists:foldl(fun(Shard, Acc) ->
-            case Shard#shard.range == Range of
-                true -> [Shard#shard.node | Acc];
-                false -> Acc
+        Type = couch_util:get_value(<<"type">>, Props),
+        if Type =/= <<"internal_replication">> -> false; true ->
+            DbName = couch_util:get_value(<<"dbname">>, Props),
+            SourceBin = couch_util:get_value(<<"source">>, Props),
+            TargetBin = couch_util:get_value(<<"target">>, Props),
+            Range = couch_util:get_value(<<"range">>, Props),
+
+            Source = binary_to_existing_atom(SourceBin, latin1),
+            Target = binary_to_existing_atom(TargetBin, latin1),
+
+            try
+                Shards = mem3:shards(DbName),
+                Nodes = lists:foldl(fun(Shard, Acc) ->
+                    case Shard#shard.range == Range of
+                        true -> [Shard#shard.node | Acc];
+                        false -> Acc
+                    end
+                end, [], mem3:shards(DbName)),
+                lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
+            catch
+                error:database_does_not_exist ->
+                    false
             end
-        end, [], mem3:shards(DbName)),
-        lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
-    catch
-        error:database_does_not_exist ->
-            false
+        end
+    catch _:_ ->
+        false
     end.
 
 
@@ -500,8 +505,6 @@ purge_cp_body(#acc{} = Acc, PurgeSeq) ->
         {<<"type">>, <<"internal_replication">>},
         {<<"updated_on">>, NowSecs},
         {<<"purge_seq">>, PurgeSeq},
-        {<<"verify_module">>, <<"mem3_rep">>},
-        {<<"verify_function">>, <<"verify_purge_checkpoint">>},
         {<<"dbname">>, Source#shard.dbname},
         {<<"source">>, atom_to_binary(Source#shard.node, latin1)},
         {<<"target">>, atom_to_binary(Target#shard.node, latin1)},