You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/16 20:00:02 UTC

[couchdb] 08/20: WIP - couch_db.erl

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 334af0165dc864b8aa235145cadee4fd465b0981
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Mar 16 14:55:51 2018 -0500

    WIP - couch_db.erl
---
 src/couch/src/couch_db.erl | 105 ++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 99 insertions(+), 6 deletions(-)

diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index e27f632..3ef6ab0 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -50,6 +50,7 @@
     get_user_ctx/1,
     get_uuid/1,
     get_purge_seq/1,
+    get_oldest_purge_seq/1,
     get_purge_infos_limit/1,
 
     is_db/1,
@@ -76,7 +77,9 @@
     get_full_doc_infos/2,
     get_missing_revs/2,
     get_design_docs/1,
-    load_purge_infos/2,
+    get_purge_infos/2,
+
+    get_minimum_purge_seq/1,
 
     update_doc/3,
     update_doc/4,
@@ -381,15 +384,107 @@ get_full_doc_infos(Db, Ids) ->
 purge_docs(#db{main_pid = Pid}, UUIdsIdsRevs) ->
     gen_server:call(Pid, {purge_docs, UUIdsIdsRevs});
 
--spec load_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when
+-spec get_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when
     UUId :: binary(),
     PurgeInfo :: {PurgeSeq, UUId, Id, [Rev]} | not_found,
     Id :: binary(),
     Rev :: {non_neg_integer(), binary()}.
-load_purge_infos(Db, UUIDs) ->
+get_purge_infos(Db, UUIDs) ->
     couch_db_engine:load_purge_infos(Db, UUIDs).
 
 
+get_minimum_purge_seq(#db{} = Db) ->
+    PurgeSeq = couch_db:get_purge_seq(Db),
+    OldestPurgeSeq = couch_db:get_oldest_purge_seq(Db),
+    PurgeInfosLimit = couch_db:get_purge_infos_limit(Db),
+
+    FoldFun = fun(#doc{id = DocId, body = {Props}}, SeqAcc) ->
+        ClientSeq = couch_util:get_value(<<"purge_seq">>, Props),
+        case ClientSeq of
+            CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit ->
+                {ok, SeqAcc};
+            CS when is_integer(CS) ->
+                case purge_client_exists(Db, DocId, Props) of
+                    true -> {ok, erlang:min(CS, SeqAcc)};
+                    false -> {ok, SeqAcc}
+                end;
+            _ ->
+                % If there's a broken doc we have to keep every
+                % purge info until the doc is fixed or removed.
+                Fmt = "Invalid purge doc '~s' with purge_seq '~w'",
+                couch_log:error(Fmt, [DocId, ClientSeq]),
+                {ok, erlang:min(OldestPurgeSeq, SeqAcc)}
+        end
+    end,
+    InitMinSeq = PurgeSeq - PurgeInfosLimit,
+    Opts = [
+        {start_key, list_to_binary(?LOCAL_DOC_PREFIX + "purge-")},
+        {end_key_gt, list_to_binary(?LOCAL_DOC_PREFIX + "purge.")}
+    ],
+    {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitSeq, Opts),
+    FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of
+        true -> MinIdxSeq;
+        false -> erlang:max(0, PurgeSeq - PurgeInfosLimit)
+    end,
+    % Log a warning if we've got a purge sequence exceeding the
+    % configured threshold.
+    if FinalSeq < (PurgeSeq - PurgeInfosLimit) ->
+        Fmt = "The purge sequence for '~s' exceeds configured threshold",
+        couch_log:warning(Fmt, [couch_db:name(Db)])
+    end,
+    FinalSeq.
+
+
+purge_client_exists(DbName, DocID, Props) ->
+    % Warn about clients that have not updated their purge
+    % checkpoints in the last "index_lag_warn_seconds"
+    LagWindow = config:get_integer(
+            "purge", "index_lag_warn_seconds", 86400), % Default 24 hours
+
+    {Mega, Secs, _} = os:timestamp(),
+    NowSecs = Mega * 1000000 + Secs,
+    LagThreshold = NowSecs - LagWindow,
+
+    try
+        CheckFun = get_purge_client_fun(Props),
+        Exists = CheckFun(DbName, DocId, Props),
+        if not Exists -> ok; true ->
+            Updated = couch_util:get_value(<<"updated_on">>, Props),
+            if is_integer(Updated) and Updated > LagThreshold -> ok; true ->
+                Diff = NowSecs - LU,
+                Fmt = "Purge checkpint '~s' not updated in ~p seconds",
+                couch_log:error(Fmt, [DocId, NowSecs - LU])
+            end
+        end,
+        Exists
+    catch _:_ ->
+        % If we fail to check for a client we have to assume that
+        % it exists.
+        true
+    end.
+
+
+get_purge_client_fun(DocId, Props) -
+    M0 = couch_util:get_value(<<"verify_module">>, Props),
+    try
+        M = binary_to_existing_atom(M0, latin1)
+    catch error:badarg ->
+        Fmt = "Missing index module '~s' for purge checkpoint '~s'",
+        couch_log:error(Fmt, [M0, DocId]),
+        throw(failed)
+    end,
+
+    F0 = couch_util:get_value(<<"verify_function">>, Props),
+    try
+        F = binary_to_existing_atom(F0, latin1),
+        fun M:F/2
+    catch error:badarg ->
+        Fmt = "Missing function '~s' in '~s' for purge checkpoint '~s'",
+        couch_log:error(Fmt, [F0, M0, DocId]),
+        throw(failed)
+    end.
+
+
 set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
     check_is_admin(Db),
     gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity);
@@ -418,9 +513,7 @@ get_purge_seq(#db{}=Db) ->
     {ok, couch_db_engine:get_purge_seq(Db)}.
 
 get_oldest_purge_seq(#db{}=Db) ->
-    {ok, StartSeq} = get_purge_seq(Db),
-    FoldFun = fun({_UUId, PurgeSeq, _, _}, _) -> {stop, PurgeSeq} end,
-    fold_purge_infos(Db, StartSeq, FoldFun, StartSeq).
+    {ok, couch_db_engine:get_oldest_purge_seq(Db)}.
 
 get_purge_infos_limit(#db{}=Db) ->
     couch_db_engine:get_purge_infos_limit(Db).

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.