You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/03/16 19:59:57 UTC
[couchdb] 03/20: WIP - couch_db_updater
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch COUCHDB-3326-clustered-purge-davisp-refactor
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 63d39c2ef66f83adc96f5c3133f1893014227a33
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Mar 14 13:17:58 2018 -0500
WIP - couch_db_updater
---
src/couch/src/couch_db_updater.erl | 243 ++++++++++++++++++++++++-------------
1 file changed, 162 insertions(+), 81 deletions(-)
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 79567e9..ce0d45f 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -93,79 +93,37 @@ handle_call({set_revs_limit, Limit}, _From, Db) ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2, idle_limit()};
-handle_call({purge_docs, _IdRevs}, _From,
- #db{compactor_pid=Pid}=Db) when Pid /= nil ->
- {reply, {error, purge_during_compaction}, Db, idle_limit()};
-handle_call({purge_docs, IdRevs}, _From, Db) ->
- DocIds = [Id || {Id, _Revs} <- IdRevs],
- OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
-
- NewDocInfos = lists:flatmap(fun
- ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
- case couch_key_tree:remove_leafs(Tree, Revs) of
- {_, [] = _RemovedRevs} -> % no change
- [];
- {NewTree, RemovedRevs} ->
- NewFDI = FDI#full_doc_info{rev_tree = NewTree},
- [{FDI, NewFDI, RemovedRevs}]
- end;
- ({_, not_found}) ->
- []
- end, lists:zip(IdRevs, OldDocInfos)),
-
- InitUpdateSeq = couch_db_engine:get_update_seq(Db),
- InitAcc = {InitUpdateSeq, [], []},
- FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
- #full_doc_info{
- id = Id,
- rev_tree = OldTree
- } = OldFDI,
- {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
-
- {NewFDIAcc, NewSeqAcc} = case OldTree of
- [] ->
- % If we purged every #leaf{} in the doc record
- % then we're removing it completely from the
- % database.
- FDIAcc;
- _ ->
- % Its possible to purge the #leaf{} that contains
- % the update_seq where this doc sits in the update_seq
- % sequence. Rather than do a bunch of complicated checks
- % we just re-label every #leaf{} and reinsert it into
- % the update_seq sequence.
- {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
- (_RevId, Leaf, leaf, InnerSeqAcc) ->
- {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
- (_RevId, Value, _Type, InnerSeqAcc) ->
- {Value, InnerSeqAcc}
- end, SeqAcc0, OldTree),
-
- NewFDI = OldFDI#full_doc_info{
- update_seq = SeqAcc1,
- rev_tree = NewTree
- },
-
- {[NewFDI | FDIAcc], SeqAcc1}
- end,
- NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
- {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
- end, InitAcc, NewDocInfos),
-
- {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
-
- % We need to only use the list of #full_doc_info{} records
- % that we have actually changed due to a purge.
- PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
- Pairs = pair_purge_info(PreviousFDIs, FDIs),
-
- {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
-
+handle_call({set_purge_infos_limit, Limit}, _From, Db) ->
+ {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
- couch_event:notify(Db#db.name, updated),
+ {reply, ok, Db2};
+
+handle_call({purge_docs, PurgeReqs0}, _From, Db) ->
+ % First filter out any purge requests we've already
+ % processed.
+ UUIDs = [UUID || {UUID, _Id, _Revs} <- PurgeReqs0],
+ {ok, PurgeInfos} = couch_db:load_purge_infos(Db, UUIDs),
+ PurgeReqs = lists:foldr(fun
+ ({not_found, PReq}, Acc) -> [PReq | Acc];
+ ({{_, _, _, _}, _}, Acc) -> Acc
+ end, lists:zip(PurgeInfos, PurgeReqs0)),
+
+ % Processing any remaining purge requests
+ Ids = [Id || {_UUID, Id, _Revs} <- PurgeReqs],
+ DocInfos = couch_db_engine:open_docs(Db, Ids),
+ UpdateSeq = couch_db_engine:get_update_seq(Db),
+ PurgeSeq = couch_db_engine:get_purge_seq(Db),
+
+ InitAcc = {[], [], []},
+ {Pairs, PInfos, Replies} = purge_docs(
+ Db, PurgeReqs, DocInfos, UpdateSeq, PurgeSeq, InitAcc),
- PurgeSeq = couch_db_engine:get_purge_seq(Db2),
- {reply, {ok, PurgeSeq, PurgedIdRevs}, Db2, idle_limit()};
+ Db2 = if Pairs == [] -> Db; true ->
+ {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos),
+ ok = gen_server:call(couch_server, {db_updated, Db1}, infinity),
+ couch_event:notify(Db1#db.name, updated)
+ end,
+ {reply, {ok, Replies}, Db2};
handle_call(Msg, From, Db) ->
case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
@@ -645,7 +603,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
Pairs = pair_write_info(OldDocLookups, IndexFDIs),
LocalDocs2 = update_local_doc_revs(LocalDocs),
- {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
+ {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2),
WriteCount = length(IndexFDIs),
couch_stats:increment_counter([couchdb, document_inserts],
@@ -691,6 +649,138 @@ update_local_doc_revs(Docs) ->
end, Docs).
+purge_docs([], [], _USeq, _PSeq, {Pairs, PInfos, Replies}) ->
+ {lists:reverse(Pairs), lists:reverse(PInfos), lists:reverse(Replies)};
+
+purge_docs([Req | RestReqs], [FDI | RestInfos], USeq, PSeq, Acc) ->
+ {UUID, DocId, Revs} = Req,
+ {Pair, RemovedRevs, NewUSeq} = case FDI of
+ #full_doc_info{rev_tree = Tree} ->
+ case couch_key_tree:remove_leafs(Tree, Revs) of
+ {_, []} ->
+ % No change
+ {{not_found, not_found}, [], USeq};
+ {[], Removed} ->
+ % Completely purged
+ {{FDI, not_found}, Removed, USeq};
+ {NewTree, Removed} ->
+ % Its possible to purge the #leaf{} that contains
+ % the update_seq where this doc sits in the
+ % update_seq sequence. Rather than do a bunch of
+ % complicated checks we just re-label every #leaf{}
+ % and reinsert it into the update_seq sequence.
+ {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun
+ (_RevId, Leaf, leaf, SeqAcc) ->
+ {Leaf#leaf{seq = SeqAcc + 1},
+ SeqAcc + 1};
+ (_RevId, Value, _Type, SeqAcc) ->
+ {Value, SeqAcc}
+ end, USeq, NewTree),
+
+ NewFDI = FDI#full_doc_info{
+ update_seq = NewUpdateSeq,
+ rev_tree = NewTree2
+ },
+ {{FDI, NewFDI}, Removed, NewUpdateSeq}
+ end;
+ not_found ->
+ {{not_found, not_found}, [], USeq}
+ end,
+ {Pairs, PInfos, Replies} = Acc,
+ NewAcc = {
+ [Pair | Pairs],
+ [{PSeq, UUID, DocId, Revs} | PInfos],
+ [{ok, RemovedRevs} | Replies]
+ },
+ purge_docs(RestReqs, RestInfos, NewUSeq, PSeq + 1, NewAcc).
+
+
+% find purge seq such that all purge requests that happen before or
+% during it can be removed from purge trees
+get_disposable_purge_seq(#db{name=DbName} = Db) ->
+ PSeq = couch_db_engine:get_purge_seq(Db),
+ OldestPSeq = couch_db_engine:get_oldest_purge_seq(Db),
+ PDocsLimit = couch_db_engine:get_purged_docs_limit(Db),
+ ExpectedDispPSeq = PSeq - PDocsLimit,
+ % client's purge_seq can be up to "allowed_purge_seq_lag"
+ % behind ExpectedDispPSeq
+ AllowedPSeqLag = config:get_integer("purge", "allowed_purge_seq_lag", 100),
+ ClientAllowedMinPSeq = ExpectedDispPSeq - AllowedPSeqLag,
+ DisposablePSeq = if OldestPSeq > ClientAllowedMinPSeq ->
+ % DisposablePSeq is the last pseq we can remove;
+ % it should be one less than OldestPSeq when #purges is within limit
+ OldestPSeq - 1;
+ true ->
+ % Find the smallest checkpointed purge_seq among clients
+ V = "v" ++ config:get("purge", "version", "1") ++ "-",
+ Opts = [
+ {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-" ++ V)},
+ {end_key_gt, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge1")}
+ ],
+ FoldFun = fun(#doc{id=DocID, body={Props}}, MinPSeq) ->
+ ClientPSeq = couch_util:get_value(<<"purge_seq">>, Props),
+ MinPSeq2 = if ClientPSeq >= ClientAllowedMinPSeq ->
+ erlang:min(MinPSeq, ClientPSeq);
+ true ->
+ case check_client_exists(DbName, DocID, Props) of
+ true -> erlang:min(MinPSeq, ClientPSeq);
+ false -> MinPSeq % ignore nonexisting clients
+ end
+ end,
+ {ok, MinPSeq2}
+ end,
+ {ok, ClientPSeq} = couch_db_engine:fold_local_docs(
+ Db, FoldFun, PSeq, Opts),
+ erlang:min(ClientPSeq, ExpectedDispPSeq)
+ end,
+ DisposablePSeq.
+
+
+check_client_exists(DbName, DocID, Props) ->
+ % will warn about clients that have not
+ % checkpointed more than "allowed_purge_time_lag"
+ AllowedPTimeLag = config:get_integer("purge",
+ "allowed_purge_time_lag", 86400), % secs in 1 day
+ M0 = couch_util:get_value(<<"verify_module">>, Props),
+ F0 = couch_util:get_value(<<"verify_function">>, Props),
+ M = binary_to_atom(M0, latin1),
+ F = binary_to_atom(F0, latin1),
+ {A} = couch_util:get_value(<<"verify_options">>, Props),
+ ClientExists = try erlang:apply(M, F, [A]) of
+ true ->
+ % warn if we haven't heard of this client more than AllowedPTimeLag
+ ClientTime = ?b2l(couch_util:get_value(<<"timestamp_utc">>, Props)),
+ {ok, [Y, Mon, D, H, Min, S], [] }=
+ io_lib:fread("~4d-~2d-~2dT~2d:~2d:~2dZ", ClientTime),
+ SecsClient = calendar:datetime_to_gregorian_seconds(
+ {{Y, Mon, D}, {H, Min, S}}),
+ SecsNow = calendar:datetime_to_gregorian_seconds(
+ calendar:now_to_universal_time(os:timestamp())),
+ if SecsClient + AllowedPTimeLag > SecsNow -> ok; true ->
+ couch_log:warning(
+ "Client: ~p hasn't processed purge requests for more than"
+ " ~p secs. Check this client, as it prevents compaction of "
+ "purge trees on db:~p.", [A, AllowedPTimeLag, DbName]
+ )
+ end,
+ true;
+ false ->
+ couch_log:warning(
+ "Client ~p doesn't exist, "
+ "but its checkpoint purge doc: ~p is still available. "
+ "Remove this doc from: ~p", [A, DocID, DbName]
+ ),
+ false
+ catch
+ error:Error ->
+ couch_log:error(
+ "error in evaluating if client: ~p exists: ~p", [A, Error]
+ ),
+ false
+ end,
+ ClientExists.
+
+
commit_data(Db) ->
commit_data(Db, false).
@@ -720,15 +810,6 @@ pair_write_info(Old, New) ->
end, New).
-pair_purge_info(Old, New) ->
- lists:map(fun(OldFDI) ->
- case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
- #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
- false -> {OldFDI, not_found}
- end
- end, Old).
-
-
get_meta_body_size(Meta) ->
{ejson_size, ExternalSize} = lists:keyfind(ejson_size, 1, Meta),
ExternalSize.
--
To stop receiving notification emails like this one, please contact
davisp@apache.org.