You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ji...@apache.org on 2018/06/22 14:24:31 UTC
[couchdb] 01/01: Address Ilya's comments
This is an automated email from the ASF dual-hosted git repository.
jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 2af5b5a2b53cc26f84b4a7fed81636aab5f9e46f
Author: jiangphcn <ji...@cn.ibm.com>
AuthorDate: Thu Jun 21 16:10:06 2018 +0800
Address Ilya's comments
- Use couch_epi:any/5 to call index verification func
- swap "stop_couch/1" and "remove files"
- refactor reason for throw bad_request
- directly use meck:unload/0
COUCHDB-3326
---
src/couch/src/couch_db.erl | 44 +++--------------
src/couch/src/couch_db_plugin.erl | 6 +++
src/couch/test/couch_bt_engine_upgrade_tests.erl | 4 +-
.../src/couch_index_plugin_couch_db.erl | 5 ++
src/couch_mrview/src/couch_mrview_index.erl | 55 ++++++++--------------
src/couch_mrview/src/couch_mrview_util.erl | 5 +-
.../test/couch_mrview_purge_docs_fabric_tests.erl | 2 +-
.../test/couch_mrview_purge_docs_tests.erl | 25 +++++++---
.../src/cpse_test_purge_bad_checkpoints.erl | 49 ++-----------------
src/mem3/src/mem3_epi.erl | 3 +-
.../src/mem3_plugin_couch_db.erl} | 8 ++--
src/mem3/src/mem3_rep.erl | 49 ++++++++++---------
12 files changed, 96 insertions(+), 159 deletions(-)
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 2645ea2..2a3eb85 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -81,7 +81,6 @@
get_minimum_purge_seq/1,
purge_client_exists/3,
- get_purge_client_fun/2,
update_doc/3,
update_doc/4,
@@ -451,6 +450,7 @@ get_minimum_purge_seq(#db{} = Db) ->
{start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")}
],
{ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts),
+
FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of
true -> MinIdxSeq;
false -> erlang:max(0, PurgeSeq - PurgeInfosLimit)
@@ -475,56 +475,26 @@ purge_client_exists(DbName, DocId, Props) ->
LagThreshold = NowSecs - LagWindow,
try
- CheckFun = get_purge_client_fun(DocId, Props),
- Exists = CheckFun(Props),
+ Exists = couch_db_plugin:is_valid_purge_client(Props),
if not Exists -> ok; true ->
Updated = couch_util:get_value(<<"updated_on">>, Props),
if is_integer(Updated) and Updated > LagThreshold -> ok; true ->
Diff = NowSecs - Updated,
- Fmt1 = "Purge checkpoint '~s' not updated in ~p seconds",
- couch_log:error(Fmt1, [DocId, Diff])
+ Fmt2 = "Purge checkpoint '~s' not updated in ~p seconds",
+ couch_log:error(Fmt2, [DocId, Diff])
end
end,
Exists
catch _:_ ->
% If we fail to check for a client we have to assume that
% it exists.
- Fmt2 = "Failed to check purge checkpoint using
- document '~p' on database ~p",
- couch_log:error(Fmt2, [DbName, DocId]),
+ Fmt3 = "Failed to check purge checkpoint using
+ document '~p' in database ~p",
+ couch_log:error(Fmt3, [DbName, DocId]),
true
end.
-get_purge_client_fun(DocId, Props) ->
- M0 = couch_util:get_value(<<"verify_module">>, Props),
- M = try
- binary_to_existing_atom(M0, latin1)
- catch error:badarg ->
- Fmt1 = "Missing index module '~p' for purge checkpoint '~s'",
- couch_log:error(Fmt1, [M0, DocId]),
- throw(failed)
- end,
-
- F0 = couch_util:get_value(<<"verify_function">>, Props),
- try
- F = binary_to_existing_atom(F0, latin1),
- case erlang:function_exported(M, F, 1) of
- true ->
- fun M:F/1;
- false ->
- Fmt2 = "Missing exported function '~p' in '~p'
- for purge checkpoint '~s'",
- couch_log:error(Fmt2, [F0, M0, DocId]),
- throw(failed)
- end
- catch error:badarg ->
- Fmt3 = "Missing function '~p' in '~p' for purge checkpoint '~s'",
- couch_log:error(Fmt3, [F0, M0, DocId]),
- throw(failed)
- end.
-
-
set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
check_is_admin(Db),
gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity);
diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl
index 8163256..c0bcc2f 100644
--- a/src/couch/src/couch_db_plugin.erl
+++ b/src/couch/src/couch_db_plugin.erl
@@ -18,6 +18,7 @@
after_doc_read/2,
validate_docid/1,
check_is_admin/1,
+ is_valid_purge_client/1,
on_compact/2,
on_delete/2
]).
@@ -57,6 +58,11 @@ check_is_admin(Db) ->
%% callbacks return true only if it specifically allow the given Id
couch_epi:any(Handle, ?SERVICE_ID, check_is_admin, [Db], []).
+is_valid_purge_client(Props) ->
+ Handle = couch_epi:get_handle(?SERVICE_ID),
+ %% callbacks return true only if it specifically allow the given Id
+ couch_epi:any(Handle, ?SERVICE_ID, is_valid_purge_client, [Props], []).
+
on_compact(DbName, DDocs) ->
Handle = couch_epi:get_handle(?SERVICE_ID),
couch_epi:apply(Handle, ?SERVICE_ID, on_compact, [DbName, DDocs], []).
diff --git a/src/couch/test/couch_bt_engine_upgrade_tests.erl b/src/couch/test/couch_bt_engine_upgrade_tests.erl
index 6aef366..8c748f8 100644
--- a/src/couch/test/couch_bt_engine_upgrade_tests.erl
+++ b/src/couch/test/couch_bt_engine_upgrade_tests.erl
@@ -37,10 +37,10 @@ setup() ->
teardown({Ctx, Paths}) ->
+ test_util:stop_couch(Ctx),
lists:foreach(fun(Path) ->
file:delete(Path)
- end, Paths),
- test_util:stop_couch(Ctx).
+ end, Paths).
upgrade_test_() ->
diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/couch_index/src/couch_index_plugin_couch_db.erl
index 937f7c8..5d4a6ac 100644
--- a/src/couch_index/src/couch_index_plugin_couch_db.erl
+++ b/src/couch_index/src/couch_index_plugin_couch_db.erl
@@ -13,9 +13,14 @@
-module(couch_index_plugin_couch_db).
-export([
+ is_valid_purge_client/1,
on_compact/2
]).
+is_valid_purge_client(Props) ->
+ couch_mrview_index:verify_index_exists(Props).
+
+
on_compact(DbName, DDocs) ->
couch_mrview_index:ensure_local_purge_docs(DbName, DDocs).
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index 7756f52..8ab29b9 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -220,45 +220,30 @@ index_file_exists(State) ->
verify_index_exists(Props) ->
- ShardDbName = couch_mrview_util:get_value_from_options(
- <<"dbname">>,
- Props
- ),
- DDocId = couch_mrview_util:get_value_from_options(
- <<"ddoc_id">>,
- Props
- ),
- SigInLocal = couch_mrview_util:get_value_from_options(
- <<"signature">>,
- Props
- ),
- case couch_db:open_int(ShardDbName, []) of
- {ok, Db} ->
- try
- DbName = mem3:dbname(couch_db:name(Db)),
- case ddoc_cache:open(DbName, DDocId) of
- {ok, DDoc} ->
+ try
+ Type = couch_util:get_value(<<"type">>, Props),
+ if Type =/= <<"mrview">> -> false; true ->
+ ShardDbName = couch_util:get_value(<<"dbname">>, Props),
+ DDocId = couch_util:get_value(<<"ddoc_id">>, Props),
+ couch_util:with_db(ShardDbName, fun(Db) ->
+ {ok, DesignDocs} = couch_db:get_design_docs(Db),
+ case lists:keyfind(DDocId, #full_doc_info.id, DesignDocs) of
+ #full_doc_info{} = DDocInfo ->
+ {ok, DDoc} = couch_db:open_doc_int(
+ Db, DDocInfo, [ejson_body]),
{ok, IdxState} = couch_mrview_util:ddoc_to_mrst(
- ShardDbName,
- DDoc
- ),
+ ShardDbName, DDoc),
IdxSig = IdxState#mrst.sig,
+ SigInLocal = couch_util:get_value(
+ <<"signature">>, Props),
couch_index_util:hexsig(IdxSig) == SigInLocal;
- _Else ->
+ false ->
false
end
- catch E:T ->
- Stack = erlang:get_stacktrace(),
- couch_log:error(
- "Error occurs when verifying existence of ~s/~s :: ~p ~p",
- [ShardDbName, DDocId, {E, T}, Stack]
- ),
- false
- after
- catch couch_db:close(Db)
- end;
- _ ->
- false
+ end)
+ end
+ catch _:_ ->
+ false
end.
@@ -305,8 +290,6 @@ update_local_purge_doc(Db, State, PSeq) ->
{<<"type">>, <<"mrview">>},
{<<"purge_seq">>, PSeq},
{<<"updated_on">>, NowSecs},
- {<<"verify_module">>, <<"couch_mrview_index">>},
- {<<"verify_function">>, <<"verify_index_exists">>},
{<<"dbname">>, get(db_name, State)},
{<<"ddoc_id">>, get(idx_name, State)},
{<<"signature">>, Sig}
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index b274961..a9ae661 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -44,14 +44,13 @@
get_local_purge_doc_id(Sig) ->
- Version = "v" ++ config:get("purge", "version", "1") ++ "-",
- ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Version ++ Sig).
+ ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Sig).
get_value_from_options(Key, Options) ->
case couch_util:get_value(Key, Options) of
undefined ->
- Reason = binary_to_list(Key) ++ " must exist in Options.",
+ Reason = <<"'", Key/binary, "' must exists in options.">>,
throw({bad_request, Reason});
Value -> Value
end.
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
index 99bfa9e..6b0d4d9 100644
--- a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
@@ -31,7 +31,7 @@ setup() ->
teardown(DbName) ->
- meck:unload(couch_mrview_index),
+ meck:unload(),
ok = fabric:delete_db(DbName, [?ADMIN_CTX]).
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
index d5bf9f6..a7fb34f 100644
--- a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
@@ -361,20 +361,19 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) ->
couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit),
_Result = run_query(Db1, []),
- % purge 150 documents
- PurgedDocsNum = 150,
+ % first purge 30 documents
+ PurgedDocsNum1 = 30,
IdsRevs = lists:foldl(fun(Id, CIdRevs) ->
Id1 = docid(Id),
FDI1 = couch_db:get_full_doc_info(Db1, Id1),
Rev1 = get_rev(FDI1),
UUID1 = uuid(Id),
[{UUID1, Id1, [Rev1]} | CIdRevs]
- end, [], lists:seq(1, PurgedDocsNum)),
+ end, [], lists:seq(1, PurgedDocsNum1)),
{ok, _} = couch_db:purge_docs(Db1, IdsRevs),
- % run query again to reflect purge requests
- % to mrview
{ok, Db2} = couch_db:reopen(Db1),
+ % run query again to reflect purge request to mrview
_Result1 = run_query(Db2, []),
{ok, PurgedIdRevs} = couch_db:fold_purge_infos(
Db2,
@@ -383,9 +382,21 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) ->
[],
[]
),
- ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+ ?assertEqual(PurgedDocsNum1, length(PurgedIdRevs)),
+
+ % then purge 120 documents
+ PurgedDocsNum2 = 150,
+ IdsRevs2 = lists:foldl(fun(Id, CIdRevs) ->
+ Id1 = docid(Id),
+ FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+ Rev1 = get_rev(FDI1),
+ UUID1 = uuid(Id),
+ [{UUID1, Id1, [Rev1]} | CIdRevs]
+ end, [], lists:seq(PurgedDocsNum1 + 1, PurgedDocsNum2)),
+ {ok, _} = couch_db:purge_docs(Db2, IdsRevs2),
% run compaction to trigger pruning of purge tree
+ % only the first 30 purge requests are pruned
{ok, Db3} = couch_db:open_int(DbName, []),
{ok, _CompactPid} = couch_db:start_compact(Db3),
wait_compaction(DbName, "database", ?LINE),
@@ -401,7 +412,7 @@ test_purge_compact_for_stale_purge_cp_with_client(Db) ->
[],
[]
),
- ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2))
+ ?assertEqual(PurgedDocsNum2 - PurgedDocsNum1, length(PurgedIdRevs2))
end).
diff --git a/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl
index 3d5edb1..b511e01 100644
--- a/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl
+++ b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl
@@ -48,7 +48,7 @@ teardown_each(Db) ->
cpse_bad_purge_seq(Db1) ->
- Db2 = save_local_doc(Db1, <<"foo">>, ?MODULE, valid_fun),
+ Db2 = save_local_doc(Db1, <<"foo">>),
?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
ok = couch_db:set_purge_infos_limit(Db2, 5),
@@ -56,35 +56,8 @@ cpse_bad_purge_seq(Db1) ->
?assertEqual(1, couch_db:get_minimum_purge_seq(Db3)).
-cpse_bad_verify_mod(Db1) ->
- Db2 = save_local_doc(Db1, 2, [invalid_module], valid_fun),
- ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
-
- ok = couch_db:set_purge_infos_limit(Db2, 5),
- {ok, Db3} = couch_db:reopen(Db2),
- ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
-
-
-cpse_bad_verify_fun(Db1) ->
- Db2 = save_local_doc(Db1, 2, ?MODULE, [invalid_function]),
- ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
-
- ok = couch_db:set_purge_infos_limit(Db2, 5),
- {ok, Db3} = couch_db:reopen(Db2),
- ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
-
-
-cpse_verify_fun_throws(Db1) ->
- Db2 = save_local_doc(Db1, 2, ?MODULE, throw_fun),
- ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
-
- ok = couch_db:set_purge_infos_limit(Db2, 5),
- {ok, Db3} = couch_db:reopen(Db2),
- ?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
-
-
cpse_verify_non_boolean(Db1) ->
- Db2 = save_local_doc(Db1, 2, ?MODULE, non_bool_fun),
+ Db2 = save_local_doc(Db1, 2),
?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)),
ok = couch_db:set_purge_infos_limit(Db2, 5),
@@ -92,30 +65,16 @@ cpse_verify_non_boolean(Db1) ->
?assertEqual(2, couch_db:get_minimum_purge_seq(Db3)).
-save_local_doc(Db1, PurgeSeq, Mod, Fun) ->
+save_local_doc(Db1, PurgeSeq) ->
{Mega, Secs, _} = os:timestamp(),
NowSecs = Mega * 1000000 + Secs,
Doc = couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE({[
- {<<"_id">>, <<"_local/purge-test-stuff-v1">>},
+ {<<"_id">>, <<"_local/purge-test-stuff">>},
{<<"purge_seq">>, PurgeSeq},
{<<"timestamp_utc">>, NowSecs},
- {<<"verify_module">>, Mod},
- {<<"verify_function">>, Fun},
{<<"verify_options">>, {[{<<"signature">>, <<"stuff">>}]}},
{<<"type">>, <<"test">>}
]}))),
{ok, _} = couch_db:update_doc(Db1, Doc, []),
{ok, Db2} = couch_db:reopen(Db1),
Db2.
-
-
-valid_fun(_Options) ->
- true.
-
-
-throw_fun(_Options) ->
- throw(failed).
-
-
-not_bool(_Options) ->
- ok.
diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_epi.erl
index ebcd596..4bf2bf5 100644
--- a/src/mem3/src/mem3_epi.erl
+++ b/src/mem3/src/mem3_epi.erl
@@ -30,7 +30,8 @@ app() ->
providers() ->
[
- {chttpd_handlers, mem3_httpd_handlers}
+ {couch_db, mem3_plugin_couch_db},
+ {chttpd_handlers, mem3_httpd_handlers}
].
diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/mem3/src/mem3_plugin_couch_db.erl
similarity index 79%
copy from src/couch_index/src/couch_index_plugin_couch_db.erl
copy to src/mem3/src/mem3_plugin_couch_db.erl
index 937f7c8..f19f5eb 100644
--- a/src/couch_index/src/couch_index_plugin_couch_db.erl
+++ b/src/mem3/src/mem3_plugin_couch_db.erl
@@ -10,12 +10,12 @@
% License for the specific language governing permissions and limitations under
% the License.
--module(couch_index_plugin_couch_db).
+-module(mem3_plugin_couch_db).
-export([
- on_compact/2
+ is_valid_purge_client/1
]).
-on_compact(DbName, DDocs) ->
- couch_mrview_index:ensure_local_purge_docs(DbName, DDocs).
+is_valid_purge_client(Props) ->
+ mem3_rep:verify_purge_checkpoint(Props).
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 3f224cd..22a3f7a 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -122,32 +122,37 @@ make_local_id(SourceThing, TargetThing, Filter) ->
make_purge_id(SourceUUID, TargetUUID) ->
- Version = "v" ++ config:get("purge", "version", "1") ++ "-",
- ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mem3-" ++ Version ++
- ?b2l(SourceUUID) ++ "-" ++ ?b2l(TargetUUID)).
+ <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
verify_purge_checkpoint(Props) ->
- DbName = couch_util:get_value(<<"dbname">>, Props),
- SourceBin = couch_util:get_value(<<"source">>, Props),
- TargetBin = couch_util:get_value(<<"target">>, Props),
- Range = couch_util:get_value(<<"range">>, Props),
-
- Source = binary_to_existing_atom(SourceBin, latin1),
- Target = binary_to_existing_atom(TargetBin, latin1),
-
try
- Shards = mem3:shards(DbName),
- Nodes = lists:foldl(fun(Shard, Acc) ->
- case Shard#shard.range == Range of
- true -> [Shard#shard.node | Acc];
- false -> Acc
+ Type = couch_util:get_value(<<"type">>, Props),
+ if Type =/= <<"internal_replication">> -> false; true ->
+ DbName = couch_util:get_value(<<"dbname">>, Props),
+ SourceBin = couch_util:get_value(<<"source">>, Props),
+ TargetBin = couch_util:get_value(<<"target">>, Props),
+ Range = couch_util:get_value(<<"range">>, Props),
+
+ Source = binary_to_existing_atom(SourceBin, latin1),
+ Target = binary_to_existing_atom(TargetBin, latin1),
+
+ try
+ Shards = mem3:shards(DbName),
+ Nodes = lists:foldl(fun(Shard, Acc) ->
+ case Shard#shard.range == Range of
+ true -> [Shard#shard.node | Acc];
+ false -> Acc
+ end
+ end, [], mem3:shards(DbName)),
+ lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
+ catch
+ error:database_does_not_exist ->
+ false
end
- end, [], mem3:shards(DbName)),
- lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
- catch
- error:database_does_not_exist ->
- false
+ end
+ catch _:_ ->
+ false
end.
@@ -500,8 +505,6 @@ purge_cp_body(#acc{} = Acc, PurgeSeq) ->
{<<"type">>, <<"internal_replication">>},
{<<"updated_on">>, NowSecs},
{<<"purge_seq">>, PurgeSeq},
- {<<"verify_module">>, <<"mem3_rep">>},
- {<<"verify_function">>, <<"verify_purge_checkpoint">>},
{<<"dbname">>, Source#shard.dbname},
{<<"source">>, atom_to_binary(Source#shard.node, latin1)},
{<<"target">>, atom_to_binary(Target#shard.node, latin1)},