You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/16 20:00:02 UTC
[couchdb] 08/20: WIP - couch_db.erl
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 334af0165dc864b8aa235145cadee4fd465b0981
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Mar 16 14:55:51 2018 -0500
WIP - couch_db.erl
---
src/couch/src/couch_db.erl | 105 ++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 99 insertions(+), 6 deletions(-)
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index e27f632..3ef6ab0 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -50,6 +50,7 @@
get_user_ctx/1,
get_uuid/1,
get_purge_seq/1,
+ get_oldest_purge_seq/1,
get_purge_infos_limit/1,
is_db/1,
@@ -76,7 +77,9 @@
get_full_doc_infos/2,
get_missing_revs/2,
get_design_docs/1,
- load_purge_infos/2,
+ get_purge_infos/2,
+
+ get_minimum_purge_seq/1,
update_doc/3,
update_doc/4,
@@ -381,15 +384,107 @@ get_full_doc_infos(Db, Ids) ->
purge_docs(#db{main_pid = Pid}, UUIdsIdsRevs) ->
gen_server:call(Pid, {purge_docs, UUIdsIdsRevs});
--spec load_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when
+-spec get_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when
UUId :: binary(),
PurgeInfo :: {PurgeSeq, UUId, Id, [Rev]} | not_found,
Id :: binary(),
Rev :: {non_neg_integer(), binary()}.
-load_purge_infos(Db, UUIDs) ->
+get_purge_infos(Db, UUIDs) ->
couch_db_engine:load_purge_infos(Db, UUIDs).
+get_minimum_purge_seq(#db{} = Db) ->
+ PurgeSeq = couch_db:get_purge_seq(Db),
+ OldestPurgeSeq = couch_db:get_oldest_purge_seq(Db),
+ PurgeInfosLimit = couch_db:get_purge_infos_limit(Db),
+
+ FoldFun = fun(#doc{id = DocId, body = {Props}}, SeqAcc) ->
+ ClientSeq = couch_util:get_value(<<"purge_seq">>, Props),
+ case ClientSeq of
+ CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit ->
+ {ok, SeqAcc};
+ CS when is_integer(CS) ->
+ case purge_client_exists(Db, DocId, Props) of
+ true -> {ok, erlang:min(CS, SeqAcc)};
+ false -> {ok, SeqAcc}
+ end;
+ _ ->
+ % If there's a broken doc we have to keep every
+ % purge info until the doc is fixed or removed.
+ Fmt = "Invalid purge doc '~s' with purge_seq '~w'",
+ couch_log:error(Fmt, [DocId, ClientSeq]),
+ {ok, erlang:min(OldestPurgeSeq, SeqAcc)}
+ end
+ end,
+ InitMinSeq = PurgeSeq - PurgeInfosLimit,
+ Opts = [
+ {start_key, list_to_binary(?LOCAL_DOC_PREFIX + "purge-")},
+ {end_key_gt, list_to_binary(?LOCAL_DOC_PREFIX + "purge.")}
+ ],
+ {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitSeq, Opts),
+ FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of
+ true -> MinIdxSeq;
+ false -> erlang:max(0, PurgeSeq - PurgeInfosLimit)
+ end,
+ % Log a warning if we've got a purge sequence exceeding the
+ % configured threshold.
+ if FinalSeq < (PurgeSeq - PurgeInfosLimit) ->
+ Fmt = "The purge sequence for '~s' exceeds configured threshold",
+ couch_log:warning(Fmt, [couch_db:name(Db)])
+ end,
+ FinalSeq.
+
+
+purge_client_exists(DbName, DocID, Props) ->
+ % Warn about clients that have not updated their purge
+ % checkpoints in the last "index_lag_warn_seconds"
+ LagWindow = config:get_integer(
+ "purge", "index_lag_warn_seconds", 86400), % Default 24 hours
+
+ {Mega, Secs, _} = os:timestamp(),
+ NowSecs = Mega * 1000000 + Secs,
+ LagThreshold = NowSecs - LagWindow,
+
+ try
+ CheckFun = get_purge_client_fun(Props),
+ Exists = CheckFun(DbName, DocId, Props),
+ if not Exists -> ok; true ->
+ Updated = couch_util:get_value(<<"updated_on">>, Props),
+ if is_integer(Updated) and Updated > LagThreshold -> ok; true ->
+ Diff = NowSecs - LU,
+ Fmt = "Purge checkpint '~s' not updated in ~p seconds",
+ couch_log:error(Fmt, [DocId, NowSecs - LU])
+ end
+ end,
+ Exists
+ catch _:_ ->
+ % If we fail to check for a client we have to assume that
+ % it exists.
+ true
+ end.
+
+
+get_purge_client_fun(DocId, Props) -
+ M0 = couch_util:get_value(<<"verify_module">>, Props),
+ try
+ M = binary_to_existing_atom(M0, latin1)
+ catch error:badarg ->
+ Fmt = "Missing index module '~s' for purge checkpoint '~s'",
+ couch_log:error(Fmt, [M0, DocId]),
+ throw(failed)
+ end,
+
+ F0 = couch_util:get_value(<<"verify_function">>, Props),
+ try
+ F = binary_to_existing_atom(F0, latin1),
+ fun M:F/2
+ catch error:badarg ->
+ Fmt = "Missing function '~s' in '~s' for purge checkpoint '~s'",
+ couch_log:error(Fmt, [F0, M0, DocId]),
+ throw(failed)
+ end.
+
+
set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
check_is_admin(Db),
gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity);
@@ -418,9 +513,7 @@ get_purge_seq(#db{}=Db) ->
{ok, couch_db_engine:get_purge_seq(Db)}.
get_oldest_purge_seq(#db{}=Db) ->
- {ok, StartSeq} = get_purge_seq(Db),
- FoldFun = fun({_UUId, PurgeSeq, _, _}, _) -> {stop, PurgeSeq} end,
- fold_purge_infos(Db, StartSeq, FoldFun, StartSeq).
+ {ok, couch_db_engine:get_oldest_purge_seq(Db)}.
get_purge_infos_limit(#db{}=Db) ->
couch_db_engine:get_purge_infos_limit(Db).
--
To stop receiving notification emails like this one, please contact
davisp@apache.org.