You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/16 19:59:57 UTC

[couchdb] 03/20: WIP - couch_db_updater

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 63d39c2ef66f83adc96f5c3133f1893014227a33
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Mar 14 13:17:58 2018 -0500

    WIP - couch_db_updater
---
 src/couch/src/couch_db_updater.erl | 243 ++++++++++++++++++++++++-------------
 1 file changed, 162 insertions(+), 81 deletions(-)

diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 79567e9..ce0d45f 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -93,79 +93,37 @@ handle_call({set_revs_limit, Limit}, _From, Db) ->
     ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
     {reply, ok, Db2, idle_limit()};
 
-handle_call({purge_docs, _IdRevs}, _From,
-        #db{compactor_pid=Pid}=Db) when Pid /= nil ->
-    {reply, {error, purge_during_compaction}, Db, idle_limit()};
-handle_call({purge_docs, IdRevs}, _From, Db) ->
-    DocIds = [Id || {Id, _Revs} <- IdRevs],
-    OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
-
-    NewDocInfos = lists:flatmap(fun
-        ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
-            case couch_key_tree:remove_leafs(Tree, Revs) of
-                {_, [] = _RemovedRevs} -> % no change
-                    [];
-                {NewTree, RemovedRevs} ->
-                    NewFDI = FDI#full_doc_info{rev_tree = NewTree},
-                    [{FDI, NewFDI, RemovedRevs}]
-            end;
-        ({_, not_found}) ->
-            []
-    end, lists:zip(IdRevs, OldDocInfos)),
-
-    InitUpdateSeq = couch_db_engine:get_update_seq(Db),
-    InitAcc = {InitUpdateSeq, [], []},
-    FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
-        #full_doc_info{
-            id = Id,
-            rev_tree = OldTree
-        } = OldFDI,
-        {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
-
-        {NewFDIAcc, NewSeqAcc} = case OldTree of
-            [] ->
-                % If we purged every #leaf{} in the doc record
-                % then we're removing it completely from the
-                % database.
-                FDIAcc;
-            _ ->
-                % Its possible to purge the #leaf{} that contains
-                % the update_seq where this doc sits in the update_seq
-                % sequence. Rather than do a bunch of complicated checks
-                % we just re-label every #leaf{} and reinsert it into
-                % the update_seq sequence.
-                {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
-                    (_RevId, Leaf, leaf, InnerSeqAcc) ->
-                        {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
-                    (_RevId, Value, _Type, InnerSeqAcc) ->
-                        {Value, InnerSeqAcc}
-                end, SeqAcc0, OldTree),
-
-                NewFDI = OldFDI#full_doc_info{
-                    update_seq = SeqAcc1,
-                    rev_tree = NewTree
-                },
-
-                {[NewFDI | FDIAcc], SeqAcc1}
-        end,
-        NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
-        {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
-    end, InitAcc, NewDocInfos),
-
-    {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
-
-    % We need to only use the list of #full_doc_info{} records
-    % that we have actually changed due to a purge.
-    PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
-    Pairs = pair_purge_info(PreviousFDIs, FDIs),
-
-    {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
-
+handle_call({set_purge_infos_limit, Limit}, _From, Db) ->
+    {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit),
     ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
-    couch_event:notify(Db#db.name, updated),
+    {reply, ok, Db2};
+
+handle_call({purge_docs, PurgeReqs0}, _From, Db) ->
+    % First filter out any purge requests we've already
+    % processed.
+    UUIDs = [UUID || {UUID, _Id, _Revs} <- PurgeReqs0],
+    {ok, PurgeInfos} = couch_db:load_purge_infos(Db, UUIDs),
+    PurgeReqs = lists:foldr(fun
+        ({not_found, PReq}, Acc) -> [PReq | Acc];
+        ({{_, _, _, _}, _}, Acc) -> Acc
+    end, lists:zip(PurgeInfos, PurgeReqs0)),
+
+    % Processing any remaining purge requests
+    Ids = [Id || {_UUID, Id, _Revs} <- PurgeReqs],
+    DocInfos = couch_db_engine:open_docs(Db, Ids),
+    UpdateSeq = couch_db_engine:get_update_seq(Db),
+    PurgeSeq = couch_db_engine:get_purge_seq(Db),
+
+    InitAcc = {[], [], []},
+    {Pairs, PInfos, Replies} = purge_docs(
+            Db, PurgeReqs, DocInfos, UpdateSeq, PurgeSeq, InitAcc),
 
-    PurgeSeq = couch_db_engine:get_purge_seq(Db2),
-    {reply, {ok, PurgeSeq, PurgedIdRevs}, Db2, idle_limit()};
+    Db2 = if Pairs == [] -> Db; true ->
+        {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos),
+        ok = gen_server:call(couch_server, {db_updated, Db1}, infinity),
+        couch_event:notify(Db1#db.name, updated)
+    end,
+    {reply, {ok, Replies}, Db2};
 
 handle_call(Msg, From, Db) ->
     case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
@@ -645,7 +603,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
     Pairs = pair_write_info(OldDocLookups, IndexFDIs),
     LocalDocs2 = update_local_doc_revs(LocalDocs),
 
-    {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
+    {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2),
 
     WriteCount = length(IndexFDIs),
     couch_stats:increment_counter([couchdb, document_inserts],
@@ -691,6 +649,138 @@ update_local_doc_revs(Docs) ->
     end, Docs).
 
 
+purge_docs([], [], _USeq, _PSeq, {Pairs, PInfos, Replies}) ->
+    {lists:reverse(Pairs), lists:reverse(PInfos), lists:reverse(Replies)};
+
+purge_docs([Req | RestReqs], [FDI | RestInfos], USeq, PSeq, Acc) ->
+    {UUID, DocId, Revs} = Req,
+    {Pair, RemovedRevs, NewUSeq} = case FDI of
+        #full_doc_info{rev_tree = Tree} ->
+            case couch_key_tree:remove_leafs(Tree, Revs) of
+                {_, []} ->
+                    % No change
+                    {{not_found, not_found}, [], USeq};
+                {[], Removed} ->
+                    % Completely purged
+                    {{FDI, not_found}, Removed, USeq};
+                {NewTree, Removed} ->
+                    % Its possible to purge the #leaf{} that contains
+                    % the update_seq where this doc sits in the
+                    % update_seq sequence. Rather than do a bunch of
+                    % complicated checks we just re-label every #leaf{}
+                    % and reinsert it into the update_seq sequence.
+                    {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun
+                        (_RevId, Leaf, leaf, SeqAcc) ->
+                            {Leaf#leaf{seq = SeqAcc + 1},
+                                SeqAcc + 1};
+                        (_RevId, Value, _Type, SeqAcc) ->
+                            {Value, SeqAcc}
+                    end, USeq, NewTree),
+
+                    NewFDI = FDI#full_doc_info{
+                        update_seq = NewUpdateSeq,
+                        rev_tree = NewTree2
+                    },
+                    {{FDI, NewFDI}, Removed, NewUpdateSeq}
+            end;
+        not_found ->
+            {{not_found, not_found}, [], USeq}
+    end,
+    {Pairs, PInfos, Replies} = Acc,
+    NewAcc = {
+      [Pair | Pairs],
+      [{PSeq, UUID, DocId, Revs} | PInfos],
+      [{ok, RemovedRevs} | Replies]
+    },
+    purge_docs(RestReqs, RestInfos, NewUSeq, PSeq + 1, NewAcc).
+
+
+% find purge seq such that all purge requests that happen before or
+% during it can be removed from purge trees
+get_disposable_purge_seq(#db{name=DbName} = Db) ->
+    PSeq = couch_db_engine:get_purge_seq(Db),
+    OldestPSeq = couch_db_engine:get_oldest_purge_seq(Db),
+    PDocsLimit = couch_db_engine:get_purged_docs_limit(Db),
+    ExpectedDispPSeq = PSeq - PDocsLimit,
+    % client's purge_seq can be up to "allowed_purge_seq_lag"
+    % behind ExpectedDispPSeq
+    AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100),
+    ClientAllowedMinPSeq = ExpectedDispPSeq - AllowedPSeqLag,
+    DisposablePSeq = if OldestPSeq > ClientAllowedMinPSeq ->
+        % DisposablePSeq is the last pseq we can remove;
+        % it should be one less than OldestPSeq when #purges is within limit
+        OldestPSeq - 1;
+    true ->
+        % Find the smallest checkpointed purge_seq among clients
+        V = "v" ++ config:get("purge", "version", "1") ++ "-",
+        Opts = [
+            {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-" ++ V)},
+            {end_key_gt, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge1")}
+        ],
+        FoldFun = fun(#doc{id=DocID, body={Props}}, MinPSeq) ->
+            ClientPSeq = couch_util:get_value(<<"purge_seq">>, Props),
+            MinPSeq2 = if ClientPSeq >= ClientAllowedMinPSeq ->
+                erlang:min(MinPSeq, ClientPSeq);
+            true ->
+                case check_client_exists(DbName, DocID, Props) of
+                    true ->  erlang:min(MinPSeq, ClientPSeq);
+                    false -> MinPSeq % ignore nonexisting clients
+                end
+            end,
+            {ok, MinPSeq2}
+        end,
+        {ok, ClientPSeq} = couch_db_engine:fold_local_docs(
+            Db, FoldFun, PSeq, Opts),
+        erlang:min(ClientPSeq, ExpectedDispPSeq)
+    end,
+    DisposablePSeq.
+
+
+check_client_exists(DbName, DocID, Props) ->
+    % will warn about clients that have not
+    % checkpointed more than "allowed_purge_time_lag"
+    AllowedPTimeLag = config:get_integer("purge",
+        "allowed_purge_time_lag", 86400), % secs in 1 day
+    M0 = couch_util:get_value(<<"verify_module">>, Props),
+    F0 = couch_util:get_value(<<"verify_function">>, Props),
+    M = binary_to_atom(M0, latin1),
+    F = binary_to_atom(F0, latin1),
+    {A} = couch_util:get_value(<<"verify_options">>, Props),
+    ClientExists = try erlang:apply(M, F, [A]) of
+        true ->
+            % warn if we haven't heard of this client more than AllowedPTimeLag
+            ClientTime = ?b2l(couch_util:get_value(<<"timestamp_utc">>, Props)),
+            {ok, [Y, Mon, D, H, Min, S], [] }=
+                io_lib:fread("~4d-~2d-~2dT~2d:~2d:~2dZ", ClientTime),
+            SecsClient = calendar:datetime_to_gregorian_seconds(
+                {{Y, Mon, D}, {H, Min, S}}),
+            SecsNow = calendar:datetime_to_gregorian_seconds(
+                calendar:now_to_universal_time(os:timestamp())),
+            if SecsClient + AllowedPTimeLag > SecsNow -> ok; true ->
+                couch_log:warning(
+                    "Client: ~p hasn't processed purge requests for more than"
+                    " ~p secs. Check this client, as it prevents compaction of "
+                    "purge trees on db:~p.", [A, AllowedPTimeLag, DbName]
+                )
+            end,
+            true;
+        false ->
+            couch_log:warning(
+                "Client ~p doesn't exist, "
+                "but its checkpoint purge doc: ~p is still available. "
+                "Remove this doc from: ~p", [A, DocID, DbName]
+            ),
+            false
+    catch
+        error:Error ->
+            couch_log:error(
+                "error in evaluating if client: ~p exists: ~p", [A, Error]
+            ),
+        false
+    end,
+    ClientExists.
+
+
 commit_data(Db) ->
     commit_data(Db, false).
 
@@ -720,15 +810,6 @@ pair_write_info(Old, New) ->
     end, New).
 
 
-pair_purge_info(Old, New) ->
-    lists:map(fun(OldFDI) ->
-        case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
-            #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
-            false -> {OldFDI, not_found}
-        end
-    end, Old).
-
-
 get_meta_body_size(Meta) ->
     {ejson_size, ExternalSize} = lists:keyfind(ejson_size, 1, Meta),
     ExternalSize.

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.