You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ji...@apache.org on 2018/06/22 14:24:31 UTC

[couchdb] 01/01: Address Ilya's comments

This is an automated email from the ASF dual-hosted git repository.

jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 2af5b5a2b53cc26f84b4a7fed81636aab5f9e46f
Author: jiangphcn <ji...@cn.ibm.com>
AuthorDate: Thu Jun 21 16:10:06 2018 +0800

    Address Ilya's comments
    
       - Use couch_epi:any/5 to call index verification func
       - swap "stop_couch/1" and "remove files"
       - refactor reason for throw bad_request
       - directly use meck:unload/0
    
    COUCHDB-3326
---
 src/couch/src/couch_db.erl                         | 44 +++--------------
 src/couch/src/couch_db_plugin.erl                  |  6 +++
 src/couch/test/couch_bt_engine_upgrade_tests.erl   |  4 +-
 .../src/couch_index_plugin_couch_db.erl            |  5 ++
 src/couch_mrview/src/couch_mrview_index.erl        | 55 ++++++++--------------
 src/couch_mrview/src/couch_mrview_util.erl         |  5 +-
 .../test/couch_mrview_purge_docs_fabric_tests.erl  |  2 +-
 .../test/couch_mrview_purge_docs_tests.erl         | 25 +++++++---
 .../src/cpse_test_purge_bad_checkpoints.erl        | 49 ++-----------------
 src/mem3/src/mem3_epi.erl                          |  3 +-
 .../src/mem3_plugin_couch_db.erl}                  |  8 ++--
 src/mem3/src/mem3_rep.erl                          | 49 ++++++++++---------
 12 files changed, 96 insertions(+), 159 deletions(-)

diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 2645ea2..2a3eb85 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -81,7 +81,6 @@
 
     get_minimum_purge_seq/1,
     purge_client_exists/3,
-    get_purge_client_fun/2,
 
     update_doc/3,
     update_doc/4,
@@ -451,6 +450,7 @@ get_minimum_purge_seq(#db{} = Db) ->
         {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")}
     ],
     {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts),
+
     FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of
         true -> MinIdxSeq;
         false -> erlang:max(0, PurgeSeq - PurgeInfosLimit)
@@ -475,56 +475,26 @@ purge_client_exists(DbName, DocId, Props) ->
     LagThreshold = NowSecs - LagWindow,
 
     try
-        CheckFun = get_purge_client_fun(DocId, Props),
-        Exists = CheckFun(Props),
+        Exists = couch_db_plugin:is_valid_purge_client(Props),
         if not Exists -> ok; true ->
             Updated = couch_util:get_value(<<"updated_on">>, Props),
             if is_integer(Updated) and Updated > LagThreshold -> ok; true ->
                 Diff = NowSecs - Updated,
-                Fmt1 = "Purge checkpoint '~s' not updated in ~p seconds",
-                couch_log:error(Fmt1, [DocId, Diff])
+                Fmt2 = "Purge checkpoint '~s' not updated in ~p seconds",
+                couch_log:error(Fmt2, [DocId, Diff])
             end
         end,
         Exists
     catch _:_ ->
         % If we fail to check for a client we have to assume that
         % it exists.
-        Fmt2 = "Failed to check purge checkpoint using
-            document '~p' on database ~p",
-        couch_log:error(Fmt2, [DbName, DocId]),
+        Fmt3 = "Failed to check purge checkpoint using
+            document '~p' in database ~p",
+        couch_log:error(Fmt3, [DbName, DocId]),
         true
     end.
 
 
-get_purge_client_fun(DocId, Props) ->
-    M0 = couch_util:get_value(<<"verify_module">>, Props),
-    M = try
-        binary_to_existing_atom(M0, latin1)
-    catch error:badarg ->
-        Fmt1 = "Missing index module '~p' for purge checkpoint '~s'",
-        couch_log:error(Fmt1, [M0, DocId]),
-        throw(failed)
-    end,
-
-    F0 = couch_util:get_value(<<"verify_function">>, Props),
-    try
-        F = binary_to_existing_atom(F0, latin1),
-        case erlang:function_exported(M, F, 1) of
-            true ->
-                fun M:F/1;
-            false ->
-                Fmt2 = "Missing exported function '~p' in '~p'
-                    for purge checkpoint '~s'",
-                couch_log:error(Fmt2, [F0, M0, DocId]),
-                throw(failed)
-        end
-    catch error:badarg ->
-        Fmt3 = "Missing function '~p' in '~p' for purge checkpoint '~s'",
-        couch_log:error(Fmt3, [F0, M0, DocId]),
-        throw(failed)
-    end.
-
-
 set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
     check_is_admin(Db),
     gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity);
diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl
index 8163256..c0bcc2f 100644
--- a/src/couch/src/couch_db_plugin.erl
+++ b/src/couch/src/couch_db_plugin.erl
@@ -18,6 +18,7 @@
     after_doc_read/2,
     validate_docid/1,
     check_is_admin/1,
+    is_valid_purge_client/1,
     on_compact/2,
     on_delete/2
 ]).
@@ -57,6 +58,11 @@ check_is_admin(Db) ->
     %% callbacks return true only if it specifically allow the given Id
     couch_epi:any(Handle, ?SERVICE_ID, check_is_admin, [Db], []).
 
+is_valid_purge_client(Props) ->
+    Handle = couch_epi:get_handle(?SERVICE_ID),
+    %% callbacks return true only if it specifically allow the given Id
+    couch_epi:any(Handle, ?SERVICE_ID, is_valid_purge_client, [Props], []).
+
 on_compact(DbName, DDocs) ->
     Handle = couch_epi:get_handle(?SERVICE_ID),
     couch_epi:apply(Handle, ?SERVICE_ID, on_compact, [DbName, DDocs], []).
diff --git a/src/couch/test/couch_bt_engine_upgrade_tests.erl b/src/couch/test/couch_bt_engine_upgrade_tests.erl
index 6aef366..8c748f8 100644
--- a/src/couch/test/couch_bt_engine_upgrade_tests.erl
+++ b/src/couch/test/couch_bt_engine_upgrade_tests.erl
@@ -37,10 +37,10 @@ setup() ->
 
 
 teardown({Ctx, Paths}) ->
+    test_util:stop_couch(Ctx),
     lists:foreach(fun(Path) ->
         file:delete(Path)
-    end, Paths),
-    test_util:stop_couch(Ctx).
+    end, Paths).
 
 
 upgrade_test_() ->
diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/couch_index/src/couch_index_plugin_couch_db.erl
index 937f7c8..5d4a6ac 100644
--- a/src/couch_index/src/couch_index_plugin_couch_db.erl
+++ b/src/couch_index/src/couch_index_plugin_couch_db.erl
@@ -13,9 +13,14 @@
 -module(couch_index_plugin_couch_db).
 
 -export([
+    is_valid_purge_client/1,
     on_compact/2
 ]).
 
 
+is_valid_purge_client(Props) ->
+    couch_mrview_index:verify_index_exists(Props).
+
+
 on_compact(DbName, DDocs) ->
     couch_mrview_index:ensure_local_purge_docs(DbName, DDocs).
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index 7756f52..8ab29b9 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -220,45 +220,30 @@ index_file_exists(State) ->
 
 
 verify_index_exists(Props) ->
-    ShardDbName = couch_mrview_util:get_value_from_options(
-        <<"dbname">>,
-        Props
-    ),
-    DDocId = couch_mrview_util:get_value_from_options(
-        <<"ddoc_id">>,
-        Props
-    ),
-    SigInLocal = couch_mrview_util:get_value_from_options(
-        <<"signature">>,
-        Props
-    ),
-    case couch_db:open_int(ShardDbName, []) of
-        {ok, Db} ->
-            try
-                DbName = mem3:dbname(couch_db:name(Db)),
-                case ddoc_cache:open(DbName, DDocId) of
-                    {ok, DDoc} ->
+    try
+        Type = couch_util:get_value(<<"type">>, Props),
+        if Type =/= <<"mrview">> -> false; true ->
+            ShardDbName = couch_util:get_value(<<"dbname">>, Props),
+            DDocId = couch_util:get_value(<<"ddoc_id">>, Props),
+            couch_util:with_db(ShardDbName, fun(Db) ->
+                {ok, DesignDocs} = couch_db:get_design_docs(Db),
+                case lists:keyfind(DDocId, #full_doc_info.id, DesignDocs) of
+                    #full_doc_info{} = DDocInfo ->
+                        {ok, DDoc} = couch_db:open_doc_int(
+                            Db, DDocInfo, [ejson_body]),
                         {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(
-                            ShardDbName,
-                            DDoc
-                        ),
+                            ShardDbName, DDoc),
                         IdxSig = IdxState#mrst.sig,
+                        SigInLocal = couch_util:get_value(
+                            <<"signature">>, Props),
                         couch_index_util:hexsig(IdxSig) == SigInLocal;
-                    _Else ->
+                    false ->
                         false
                 end
-            catch E:T ->
-                Stack = erlang:get_stacktrace(),
-                couch_log:error(
-                    "Error occurs when verifying existence of ~s/~s :: ~p ~p",
-                    [ShardDbName, DDocId, {E, T}, Stack]
-                ),
-                false
-            after
-                catch couch_db:close(Db)
-            end;
-        _ ->
-            false
+            end)
+        end
+    catch _:_ ->
+        false
     end.
 
 
@@ -305,8 +290,6 @@ update_local_purge_doc(Db, State, PSeq) ->
         {<<"type">>, <<"mrview">>},
         {<<"purge_seq">>, PSeq},
         {<<"updated_on">>, NowSecs},
-        {<<"verify_module">>, <<"couch_mrview_index">>},
-        {<<"verify_function">>, <<"verify_index_exists">>},
         {<<"dbname">>, get(db_name, State)},
         {<<"ddoc_id">>, get(idx_name, State)},
         {<<"signature">>, Sig}
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index b274961..a9ae661 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -44,14 +44,13 @@
 
 
 get_local_purge_doc_id(Sig) ->
-    Version = "v" ++ config:get("purge", "version", "1") ++ "-",
-    ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Version ++ Sig).
+    ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Sig).
 
 
 get_value_from_options(Key, Options) ->
     case couch_util:get_value(Key, Options) of
         undefined ->
-            Reason = binary_to_list(Key) ++ " must exist in Options.",
+            Reason = <<"'", Key/binary, "' must exists in options.">>,
             throw({bad_request, Reason});
         Value -> Value
     end.
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
index 99bfa9e..6b0d4d9 100644
--- a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
@@ -31,7 +31,7 @@ setup() ->
 
 
 teardown(DbName) ->
-    meck:unload(couch_mrview_index),
+    meck:unload(),
     ok = fabric:delete_db(DbName, [?ADMIN_CTX]).
 
 
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
index d5bf9f6..a7fb34f 100644
--- a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
@@ -361,20 +361,19 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) ->
         couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit),
         _Result = run_query(Db1, []),
 
-        % purge 150 documents
-        PurgedDocsNum = 150,
+        % first purge 30 documents
+        PurgedDocsNum1 = 30,
         IdsRevs = lists:foldl(fun(Id, CIdRevs) ->
             Id1 = docid(Id),
             FDI1 = couch_db:get_full_doc_info(Db1, Id1),
             Rev1 = get_rev(FDI1),
             UUID1 = uuid(Id),
             [{UUID1, Id1, [Rev1]} | CIdRevs]
-        end, [], lists:seq(1, PurgedDocsNum)),
+        end, [], lists:seq(1, PurgedDocsNum1)),
         {ok, _} = couch_db:purge_docs(Db1, IdsRevs),
 
-        % run query again to reflect purge requests
-        % to mrview
         {ok, Db2} = couch_db:reopen(Db1),
+        % run query again to reflect purge request to mrview
         _Result1 = run_query(Db2, []),
         {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
                 Db2,
@@ -383,9 +382,21 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) ->
                 [],
                 []
             ),
-        ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+        ?assertEqual(PurgedDocsNum1, length(PurgedIdRevs)),
+
+        % then purge 120 documents
+        PurgedDocsNum2 = 150,
+        IdsRevs2 = lists:foldl(fun(Id, CIdRevs) ->
+            Id1 = docid(Id),
+            FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+            Rev1 = get_rev(FDI1),
+            UUID1 = uuid(Id),
+            [{UUID1, Id1, [Rev1]} | CIdRevs]
+        end, [], lists:seq(PurgedDocsNum1 + 1, PurgedDocsNum2)),
+        {ok, _} = couch_db:purge_docs(Db2, IdsRevs2),
 
         % run compaction to trigger pruning of purge tree
+        % only the first 30 purge requests are pruned
         {ok, Db3} = couch_db:open_int(DbName, []),
         {ok, _CompactPid} = couch_db:start_compact(Db3),
         wait_compaction(DbName, "database", ?LINE),
@@ -401,7 +412,7 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) ->
                 [],
                 []
             ),
-        ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2))
+        ?assertEqual(PurgedDocsNum2 - PurgedDocsNum1, length(PurgedIdRevs2))
     end).
 
 
diff --git a/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl
index 3d5edb1..b511e01 100644
--- a/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl
+++ b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl
@@ -48,7 +48,7 @@ teardown_each(Db) ->
 
 
 cpse_bad_purge_seq(Db1) ->
-    Db2 = save_local_doc(Db1, <<"foo">>, ?MODULE, valid_fun),
+    Db2 = save_local_doc(Db1, <<"foo">>),
     ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
 
     ok = couch_db:set_purge_infos_limit(Db2, 5),
@@ -56,35 +56,8 @@ cpse_bad_purge_seq(Db1) ->
     ?assertEqual(1, couch_db:get_minimum_purge_seq(Db3)).
 
 
-cpse_bad_verify_mod(Db1) ->
-    Db2 = save_local_doc(Db1, 2, [invalid_module], valid_fun),
-    ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
-
-    ok = couch_db:set_purge_infos_limit(Db2, 5),
-    {ok, Db3} = couch_db:reopen(Db2),
-    ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
-
-
-cpse_bad_verify_fun(Db1) ->
-    Db2 = save_local_doc(Db1, 2, ?MODULE, [invalid_function]),
-    ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
-
-    ok = couch_db:set_purge_infos_limit(Db2, 5),
-    {ok, Db3} = couch_db:reopen(Db2),
-    ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
-
-
-cpse_verify_fun_throws(Db1) ->
-    Db2 = save_local_doc(Db1, 2, ?MODULE, throw_fun),
-    ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
-
-    ok = couch_db:set_purge_infos_limit(Db2, 5),
-    {ok, Db3} = couch_db:reopen(Db2),
-    ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
-
-
 cpse_verify_non_boolean(Db1) ->
-    Db2 = save_local_doc(Db1, 2, ?MODULE, non_bool_fun),
+    Db2 = save_local_doc(Db1, 2),
     ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
 
     ok = couch_db:set_purge_infos_limit(Db2, 5),
@@ -92,30 +65,16 @@ cpse_verify_non_boolean(Db1) ->
     ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
 
 
-save_local_doc(Db1, PurgeSeq, Mod, Fun) ->
+save_local_doc(Db1, PurgeSeq) ->
     {Mega, Secs, _} = os:timestamp(),
     NowSecs = Mega * 1000000 + Secs,
     Doc = couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE({[
-        {<<"_id">>, <<"_local/purge-test-stuff-v1">>},
+        {<<"_id">>, <<"_local/purge-test-stuff">>},
         {<<"purge_seq">>, PurgeSeq},
         {<<"timestamp_utc">>, NowSecs},
-        {<<"verify_module">>, Mod},
-        {<<"verify_function">>, Fun},
         {<<"verify_options">>, {[{<<"signature">>, <<"stuff">>}]}},
         {<<"type">>, <<"test">>}
     ]}))),
     {ok, _} = couch_db:update_doc(Db1, Doc, []),
     {ok, Db2} = couch_db:reopen(Db1),
     Db2.
-
-
-valid_fun(_Options) ->
-    true.
-
-
-throw_fun(_Options) ->
-    throw(failed).
-
-
-not_bool(_Options) ->
-    ok.
diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_epi.erl
index ebcd596..4bf2bf5 100644
--- a/src/mem3/src/mem3_epi.erl
+++ b/src/mem3/src/mem3_epi.erl
@@ -30,7 +30,8 @@ app() ->
 
 providers() ->
     [
-         {chttpd_handlers, mem3_httpd_handlers}
+        {couch_db, mem3_plugin_couch_db},
+        {chttpd_handlers, mem3_httpd_handlers}
     ].
 
 
diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/mem3/src/mem3_plugin_couch_db.erl
similarity index 79%
copy from src/couch_index/src/couch_index_plugin_couch_db.erl
copy to src/mem3/src/mem3_plugin_couch_db.erl
index 937f7c8..f19f5eb 100644
--- a/src/couch_index/src/couch_index_plugin_couch_db.erl
+++ b/src/mem3/src/mem3_plugin_couch_db.erl
@@ -10,12 +10,12 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
--module(couch_index_plugin_couch_db).
+-module(mem3_plugin_couch_db).
 
 -export([
-    on_compact/2
+    is_valid_purge_client/1
 ]).
 
 
-on_compact(DbName, DDocs) ->
-    couch_mrview_index:ensure_local_purge_docs(DbName, DDocs).
+is_valid_purge_client(Props) ->
+    mem3_rep:verify_purge_checkpoint(Props).
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 3f224cd..22a3f7a 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -122,32 +122,37 @@ make_local_id(SourceThing, TargetThing, Filter) ->
 
 
 make_purge_id(SourceUUID, TargetUUID) ->
-    Version = "v" ++ config:get("purge", "version", "1") ++ "-",
-    ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mem3-" ++ Version ++
-        ?b2l(SourceUUID) ++ "-" ++ ?b2l(TargetUUID)).
+    <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
 
 
 verify_purge_checkpoint(Props) ->
-    DbName = couch_util:get_value(<<"dbname">>, Props),
-    SourceBin = couch_util:get_value(<<"source">>, Props),
-    TargetBin = couch_util:get_value(<<"target">>, Props),
-    Range = couch_util:get_value(<<"range">>, Props),
-
-    Source = binary_to_existing_atom(SourceBin, latin1),
-    Target = binary_to_existing_atom(TargetBin, latin1),
-
     try
-        Shards = mem3:shards(DbName),
-        Nodes = lists:foldl(fun(Shard, Acc) ->
-            case Shard#shard.range == Range of
-                true -> [Shard#shard.node | Acc];
-                false -> Acc
+        Type = couch_util:get_value(<<"type">>, Props),
+        if Type =/= <<"internal_replication">> -> false; true ->
+            DbName = couch_util:get_value(<<"dbname">>, Props),
+            SourceBin = couch_util:get_value(<<"source">>, Props),
+            TargetBin = couch_util:get_value(<<"target">>, Props),
+            Range = couch_util:get_value(<<"range">>, Props),
+
+            Source = binary_to_existing_atom(SourceBin, latin1),
+            Target = binary_to_existing_atom(TargetBin, latin1),
+
+            try
+                Shards = mem3:shards(DbName),
+                Nodes = lists:foldl(fun(Shard, Acc) ->
+                    case Shard#shard.range == Range of
+                        true -> [Shard#shard.node | Acc];
+                        false -> Acc
+                    end
+                end, [], mem3:shards(DbName)),
+                lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
+            catch
+                error:database_does_not_exist ->
+                    false
             end
-        end, [], mem3:shards(DbName)),
-        lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
-    catch
-        error:database_does_not_exist ->
-            false
+        end
+    catch _:_ ->
+        false
     end.
 
 
@@ -500,8 +505,6 @@ purge_cp_body(#acc{} = Acc, PurgeSeq) ->
         {<<"type">>, <<"internal_replication">>},
         {<<"updated_on">>, NowSecs},
         {<<"purge_seq">>, PurgeSeq},
-        {<<"verify_module">>, <<"mem3_rep">>},
-        {<<"verify_function">>, <<"verify_purge_checkpoint">>},
         {<<"dbname">>, Source#shard.dbname},
         {<<"source">>, atom_to_binary(Source#shard.node, latin1)},
         {<<"target">>, atom_to_binary(Target#shard.node, latin1)},