You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ji...@apache.org on 2018/07/23 03:45:59 UTC
[couchdb] 01/09: [02/10] Clustered Purge: Update single node APIs
This is an automated email from the ASF dual-hosted git repository.
jiangphcn pushed a commit to branch COUCHDB-3326-clustered-purge-pr5-implementation
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit a61aa84020ee83a2ccb6cb101202b0d15d499617
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Apr 24 12:24:10 2018 -0500
[02/10] Clustered Purge: Update single node APIs
This patch updates the single node API implementations for use with the
new clustered purge API. At the single node level the major change is to
store a history of purge requests that can then be consumed by various
other parts of the database system.
The simpler of the major areas to use this new functionality will be any
secondary indices. Rather than checking that only a single purge request
has occurred each secondary index will store a _local document
referencing its oldest processed purge request. During index updates
each secondary index implementation will process any new purge requests
and update its local doc checkpoint. In this way secondary indexes will
no longer be sensitive to reset when multiple purge requests are issued
against the database.
The two other major areas that will make use of the newly stored purge
request history are both of the anit-entropy mechanisms: read-repair and
internal replication.
Read-repair will use the purge request history to know when a node
should discard updates that have come from a node that has not yet
processed a purge request during internal replication. Otherwise
read-repair would effectively undo any purge replication that happened
"recently".
Internal replication will use the purge request history to be able to
mend any differences between shards. For instance, if a shard is down
when a purge request is issue against a cluster this process will pull
the purge request and apply it during internal replication. And
similarly any local purge requests will be applied on the target before
normal internal replication.
COUCHDB-3326
Co-authored-by: Mayya Sharipova <ma...@ca.ibm.com>
Co-authored-by: jiangphcn <ji...@cn.ibm.com>
---
src/couch/priv/stats_descriptions.cfg | 12 +++
src/couch/src/couch_db.erl | 157 +++++++++++++++++++++++++++--
src/couch/src/couch_db_plugin.erl | 6 ++
src/couch/src/couch_db_updater.erl | 185 +++++++++++++++++++---------------
src/couch/src/couch_httpd_db.erl | 23 +++--
5 files changed, 284 insertions(+), 99 deletions(-)
diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg
index f091978..bceb0ce 100644
--- a/src/couch/priv/stats_descriptions.cfg
+++ b/src/couch/priv/stats_descriptions.cfg
@@ -34,6 +34,10 @@
{type, counter},
{desc, <<"number of times a document was read from a database">>}
]}.
+{[couchdb, database_purges], [
+ {type, counter},
+ {desc, <<"number of times a database was purged">>}
+]}.
{[couchdb, db_open_time], [
{type, histogram},
{desc, <<"milliseconds required to open a database">>}
@@ -46,6 +50,10 @@
{type, counter},
{desc, <<"number of document write operations">>}
]}.
+{[couchdb, document_purges], [
+ {type, counter},
+ {desc, <<"number of document purge operations">>}
+]}.
{[couchdb, local_document_writes], [
{type, counter},
{desc, <<"number of _local document write operations">>}
@@ -74,6 +82,10 @@
{type, counter},
{desc, <<"number of clients for continuous _changes">>}
]}.
+{[couchdb, httpd, purge_requests], [
+ {type, counter},
+ {desc, <<"number of purge requests">>}
+]}.
{[couchdb, httpd_request_methods, 'COPY'], [
{type, counter},
{desc, <<"number of HTTP COPY requests">>}
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 3449274..59ccc6d 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -43,7 +43,6 @@
get_epochs/1,
get_filepath/1,
get_instance_start_time/1,
- get_last_purged/1,
get_pid/1,
get_revs_limit/1,
get_security/1,
@@ -51,12 +50,15 @@
get_user_ctx/1,
get_uuid/1,
get_purge_seq/1,
+ get_oldest_purge_seq/1,
+ get_purge_infos_limit/1,
is_db/1,
is_system_db/1,
is_clustered/1,
set_revs_limit/2,
+ set_purge_infos_limit/2,
set_security/2,
set_user_ctx/2,
@@ -75,6 +77,10 @@
get_full_doc_infos/2,
get_missing_revs/2,
get_design_docs/1,
+ get_purge_infos/2,
+
+ get_minimum_purge_seq/1,
+ purge_client_exists/3,
update_doc/3,
update_doc/4,
@@ -84,6 +90,7 @@
delete_doc/3,
purge_docs/2,
+ purge_docs/3,
with_stream/3,
open_write_stream/2,
@@ -97,6 +104,8 @@
fold_changes/4,
fold_changes/5,
count_changes_since/2,
+ fold_purge_infos/4,
+ fold_purge_infos/5,
calculate_start_seq/3,
owner_of/2,
@@ -369,8 +378,129 @@ get_full_doc_info(Db, Id) ->
get_full_doc_infos(Db, Ids) ->
couch_db_engine:open_docs(Db, Ids).
-purge_docs(#db{main_pid=Pid}, IdsRevs) ->
- gen_server:call(Pid, {purge_docs, IdsRevs}).
+purge_docs(Db, IdRevs) ->
+ purge_docs(Db, IdRevs, []).
+
+-spec purge_docs(#db{}, [{UUId, Id, [Rev]}], [PurgeOption]) ->
+ {ok, [Reply]} when
+ UUId :: binary(),
+ Id :: binary(),
+ Rev :: {non_neg_integer(), binary()},
+ PurgeOption :: interactive_edit | replicated_changes,
+ Reply :: {ok, []} | {ok, [Rev]}.
+purge_docs(#db{main_pid = Pid} = Db, UUIDsIdsRevs, Options) ->
+ % Check here if any UUIDs already exist when
+ % we're not replicating purge infos
+ IsRepl = lists:member(replicated_changes, Options),
+ if IsRepl -> ok; true ->
+ UUIDs = [UUID || {UUID, _, _} <- UUIDsIdsRevs],
+ lists:foreach(fun(Resp) ->
+ if Resp == not_found -> ok; true ->
+ Fmt = "Duplicate purge info UIUD: ~s",
+ Reason = io_lib:format(Fmt, [element(2, Resp)]),
+ throw({badreq, Reason})
+ end
+ end, get_purge_infos(Db, UUIDs))
+ end,
+ increment_stat(Db, [couchdb, database_purges]),
+ gen_server:call(Pid, {purge_docs, UUIDsIdsRevs, Options}).
+
+-spec get_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when
+ UUId :: binary(),
+ PurgeInfo :: {PurgeSeq, UUId, Id, [Rev]} | not_found,
+ PurgeSeq :: non_neg_integer(),
+ Id :: binary(),
+ Rev :: {non_neg_integer(), binary()}.
+get_purge_infos(Db, UUIDs) ->
+ couch_db_engine:load_purge_infos(Db, UUIDs).
+
+
+get_minimum_purge_seq(#db{} = Db) ->
+ PurgeSeq = couch_db_engine:get_purge_seq(Db),
+ OldestPurgeSeq = couch_db_engine:get_oldest_purge_seq(Db),
+ PurgeInfosLimit = couch_db_engine:get_purge_infos_limit(Db),
+
+ FoldFun = fun(#doc{id = DocId, body = {Props}}, SeqAcc) ->
+ case DocId of
+ <<?LOCAL_DOC_PREFIX, "purge-", _/binary>> ->
+ ClientSeq = couch_util:get_value(<<"purge_seq">>, Props),
+ case ClientSeq of
+ CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit ->
+ {ok, SeqAcc};
+ CS when is_integer(CS) ->
+ case purge_client_exists(Db, DocId, Props) of
+ true -> {ok, erlang:min(CS, SeqAcc)};
+ false -> {ok, SeqAcc}
+ end;
+ _ ->
+ % If there's a broken doc we have to keep every
+ % purge info until the doc is fixed or removed.
+ Fmt = "Invalid purge doc '~s' on database ~p
+ with purge_seq '~w'",
+ DbName = couch_db:name(Db),
+ couch_log:error(Fmt, [DocId, DbName, ClientSeq]),
+ {ok, erlang:min(OldestPurgeSeq, SeqAcc)}
+ end;
+ _ ->
+ {stop, SeqAcc}
+ end
+ end,
+ InitMinSeq = PurgeSeq - PurgeInfosLimit,
+ Opts = [
+ {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")}
+ ],
+ {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts),
+ FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of
+ true -> MinIdxSeq;
+ false -> erlang:max(0, PurgeSeq - PurgeInfosLimit)
+ end,
+ % Log a warning if we've got a purge sequence exceeding the
+ % configured threshold.
+ if FinalSeq >= (PurgeSeq - PurgeInfosLimit) -> ok; true ->
+ Fmt = "The purge sequence for '~s' exceeds configured threshold",
+ couch_log:warning(Fmt, [couch_db:name(Db)])
+ end,
+ FinalSeq.
+
+
+purge_client_exists(DbName, DocId, Props) ->
+ % Warn about clients that have not updated their purge
+ % checkpoints in the last "index_lag_warn_seconds"
+ LagWindow = config:get_integer(
+ "purge", "index_lag_warn_seconds", 86400), % Default 24 hours
+
+ {Mega, Secs, _} = os:timestamp(),
+ NowSecs = Mega * 1000000 + Secs,
+ LagThreshold = NowSecs - LagWindow,
+
+ try
+ Exists = couch_db_plugin:is_valid_purge_client(DbName, Props),
+ if not Exists -> ok; true ->
+ Updated = couch_util:get_value(<<"updated_on">>, Props),
+ if is_integer(Updated) and Updated > LagThreshold -> ok; true ->
+ Diff = NowSecs - Updated,
+ Fmt1 = "Purge checkpoint '~s' not updated in ~p seconds
+ in database ~p",
+ couch_log:error(Fmt1, [DocId, Diff, DbName])
+ end
+ end,
+ Exists
+ catch _:_ ->
+ % If we fail to check for a client we have to assume that
+ % it exists.
+ Fmt2 = "Failed to check purge checkpoint using
+ document '~p' in database ~p",
+ couch_log:error(Fmt2, [DbName, DocId]),
+ true
+ end.
+
+
+set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity);
+set_purge_infos_limit(_Db, _Limit) ->
+ throw(invalid_purge_infos_limit).
+
get_after_doc_read_fun(#db{after_doc_read = Fun}) ->
Fun.
@@ -390,10 +520,13 @@ get_user_ctx(?OLD_DB_REC = Db) ->
?OLD_DB_USER_CTX(Db).
get_purge_seq(#db{}=Db) ->
- {ok, couch_db_engine:get_purge_seq(Db)}.
+ couch_db_engine:get_purge_seq(Db).
+
+get_oldest_purge_seq(#db{}=Db) ->
+ couch_db_engine:get_oldest_purge_seq(Db).
-get_last_purged(#db{}=Db) ->
- {ok, couch_db_engine:get_last_purged(Db)}.
+get_purge_infos_limit(#db{}=Db) ->
+ couch_db_engine:get_purge_infos_limit(Db).
get_pid(#db{main_pid = Pid}) ->
Pid.
@@ -471,7 +604,8 @@ get_db_info(Db) ->
],
{ok, InfoList}.
-get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
+get_design_docs(#db{name = <<"shards/", _:18/binary, DbFullName/binary>>}) ->
+ DbName = ?l2b(filename:rootname(filename:basename(?b2l(DbFullName)))),
{_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
receive {'DOWN', Ref, _, _, Response} ->
Response
@@ -481,7 +615,6 @@ get_design_docs(#db{} = Db) ->
{ok, Docs} = fold_design_docs(Db, FoldFun, [], []),
{ok, lists:reverse(Docs)}.
-
check_is_admin(#db{user_ctx=UserCtx}=Db) ->
case is_admin(Db) of
true -> ok;
@@ -1406,6 +1539,14 @@ fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts).
+fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
+ fold_purge_infos(Db, StartPurgeSeq, Fun, Acc, []).
+
+
+fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
+ couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
+
+
count_changes_since(Db, SinceSeq) ->
couch_db_engine:count_changes_since(Db, SinceSeq).
diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl
index 8163256..e25866e 100644
--- a/src/couch/src/couch_db_plugin.erl
+++ b/src/couch/src/couch_db_plugin.erl
@@ -18,6 +18,7 @@
after_doc_read/2,
validate_docid/1,
check_is_admin/1,
+ is_valid_purge_client/2,
on_compact/2,
on_delete/2
]).
@@ -57,6 +58,11 @@ check_is_admin(Db) ->
%% callbacks return true only if it specifically allow the given Id
couch_epi:any(Handle, ?SERVICE_ID, check_is_admin, [Db], []).
+is_valid_purge_client(DbName, Props) ->
+ Handle = couch_epi:get_handle(?SERVICE_ID),
+ %% callbacks return true only if it specifically allow the given Id
+ couch_epi:any(Handle, ?SERVICE_ID, is_valid_purge_client, [DbName, Props], []).
+
on_compact(DbName, DDocs) ->
Handle = couch_epi:get_handle(?SERVICE_ID),
couch_epi:apply(Handle, ?SERVICE_ID, on_compact, [DbName, DDocs], []).
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 6a30d65..7297718 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -94,79 +94,28 @@ handle_call({set_revs_limit, Limit}, _From, Db) ->
ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
{reply, ok, Db3, idle_limit()};
-handle_call({purge_docs, _IdRevs}, _From,
- #db{compactor_pid=Pid}=Db) when Pid /= nil ->
- {reply, {error, purge_during_compaction}, Db, idle_limit()};
-handle_call({purge_docs, IdRevs}, _From, Db) ->
- DocIds = [Id || {Id, _Revs} <- IdRevs],
- OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
-
- NewDocInfos = lists:flatmap(fun
- ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
- case couch_key_tree:remove_leafs(Tree, Revs) of
- {_, [] = _RemovedRevs} -> % no change
- [];
- {NewTree, RemovedRevs} ->
- NewFDI = FDI#full_doc_info{rev_tree = NewTree},
- [{FDI, NewFDI, RemovedRevs}]
- end;
- ({_, not_found}) ->
- []
- end, lists:zip(IdRevs, OldDocInfos)),
-
- InitUpdateSeq = couch_db_engine:get_update_seq(Db),
- InitAcc = {InitUpdateSeq, [], []},
- FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
- #full_doc_info{
- id = Id,
- rev_tree = OldTree
- } = OldFDI,
- {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
-
- {NewFDIAcc, NewSeqAcc} = case OldTree of
- [] ->
- % If we purged every #leaf{} in the doc record
- % then we're removing it completely from the
- % database.
- {FDIAcc, SeqAcc0};
- _ ->
- % Its possible to purge the #leaf{} that contains
- % the update_seq where this doc sits in the update_seq
- % sequence. Rather than do a bunch of complicated checks
- % we just re-label every #leaf{} and reinsert it into
- % the update_seq sequence.
- {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
- (_RevId, Leaf, leaf, InnerSeqAcc) ->
- {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
- (_RevId, Value, _Type, InnerSeqAcc) ->
- {Value, InnerSeqAcc}
- end, SeqAcc0, OldTree),
-
- NewFDI = OldFDI#full_doc_info{
- update_seq = SeqAcc1,
- rev_tree = NewTree
- },
-
- {[NewFDI | FDIAcc], SeqAcc1}
- end,
- NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
- {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
- end, InitAcc, NewDocInfos),
-
- {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
-
- % We need to only use the list of #full_doc_info{} records
- % that we have actually changed due to a purge.
- PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
- Pairs = pair_purge_info(PreviousFDIs, FDIs),
-
- {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
- Db3 = commit_data(Db2),
- ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
- couch_event:notify(Db#db.name, updated),
+handle_call({set_purge_infos_limit, Limit}, _From, Db) ->
+ {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ {reply, ok, Db2, idle_limit()};
- PurgeSeq = couch_db_engine:get_purge_seq(Db3),
- {reply, {ok, PurgeSeq, PurgedIdRevs}, Db3, idle_limit()};
+handle_call({purge_docs, [], _}, _From, Db) ->
+ {reply, {ok, []}, Db, idle_limit()};
+
+handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
+ % Filter out any previously applied updates during
+ % internal replication
+ IsRepl = lists:member(replicated_changes, Options),
+ PurgeReqs = if not IsRepl -> PurgeReqs0; true ->
+ UUIDs = [UUID || {UUID, _Id, _Revs} <- PurgeReqs0],
+ PurgeInfos = couch_db_engine:load_purge_infos(Db, UUIDs),
+ lists:flatmap(fun
+ ({not_found, PReq}) -> [PReq];
+ ({{_, _, _, _}, _}) -> []
+ end, lists:zip(PurgeInfos, PurgeReqs0))
+ end,
+ {ok, NewDb, Replies} = purge_docs(Db, PurgeReqs),
+ {reply, {ok, Replies}, NewDb, idle_limit()};
handle_call(Msg, From, Db) ->
case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
@@ -646,7 +595,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
Pairs = pair_write_info(OldDocLookups, IndexFDIs),
LocalDocs2 = update_local_doc_revs(LocalDocs),
- {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
+ {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2),
WriteCount = length(IndexFDIs),
couch_stats:increment_counter([couchdb, document_inserts],
@@ -692,6 +641,87 @@ update_local_doc_revs(Docs) ->
end, Docs).
+purge_docs(Db, []) ->
+ {ok, Db, []};
+
+purge_docs(Db, PurgeReqs) ->
+ Ids = lists:usort(lists:map(fun({_UUID, Id, _Revs}) -> Id end, PurgeReqs)),
+ FDIs = couch_db_engine:open_docs(Db, Ids),
+ USeq = couch_db_engine:get_update_seq(Db),
+
+ IdFDIs = lists:zip(Ids, FDIs),
+ {NewIdFDIs, Replies} = apply_purge_reqs(PurgeReqs, IdFDIs, USeq, []),
+
+ Pairs = lists:flatmap(fun({DocId, OldFDI}) ->
+ {DocId, NewFDI} = lists:keyfind(DocId, 1, NewIdFDIs),
+ case {OldFDI, NewFDI} of
+ {not_found, not_found} ->
+ [];
+ {#full_doc_info{} = A, #full_doc_info{} = A} ->
+ [];
+ {#full_doc_info{}, _} ->
+ [{OldFDI, NewFDI}]
+ end
+ end, IdFDIs),
+
+ PSeq = couch_db_engine:get_purge_seq(Db),
+ {RevPInfos, _} = lists:foldl(fun({UUID, DocId, Revs}, {PIAcc, PSeqAcc}) ->
+ Info = {PSeqAcc + 1, UUID, DocId, Revs},
+ {[Info | PIAcc], PSeqAcc + 1}
+ end, {[], PSeq}, PurgeReqs),
+ PInfos = lists:reverse(RevPInfos),
+
+ {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos),
+ Db2 = commit_data(Db1),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ couch_event:notify(Db2#db.name, updated),
+ {ok, Db2, Replies}.
+
+
+apply_purge_reqs([], IdFDIs, _USeq, Replies) ->
+ {IdFDIs, lists:reverse(Replies)};
+
+apply_purge_reqs([Req | RestReqs], IdFDIs, USeq, Replies) ->
+ {_UUID, DocId, Revs} = Req,
+ {value, {_, FDI0}, RestIdFDIs} = lists:keytake(DocId, 1, IdFDIs),
+ {NewFDI, RemovedRevs, NewUSeq} = case FDI0 of
+ #full_doc_info{rev_tree = Tree} ->
+ case couch_key_tree:remove_leafs(Tree, Revs) of
+ {_, []} ->
+ % No change
+ {FDI0, [], USeq};
+ {[], Removed} ->
+ % Completely purged
+ {not_found, Removed, USeq};
+ {NewTree, Removed} ->
+ % Its possible to purge the #leaf{} that contains
+ % the update_seq where this doc sits in the
+ % update_seq sequence. Rather than do a bunch of
+ % complicated checks we just re-label every #leaf{}
+ % and reinsert it into the update_seq sequence.
+ {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun
+ (_RevId, Leaf, leaf, SeqAcc) ->
+ {Leaf#leaf{seq = SeqAcc + 1},
+ SeqAcc + 1};
+ (_RevId, Value, _Type, SeqAcc) ->
+ {Value, SeqAcc}
+ end, USeq, NewTree),
+
+ FDI1 = FDI0#full_doc_info{
+ update_seq = NewUpdateSeq,
+ rev_tree = NewTree2
+ },
+ {FDI1, Removed, NewUpdateSeq}
+ end;
+ not_found ->
+ % Not found means nothing to change
+ {not_found, [], USeq}
+ end,
+ NewIdFDIs = [{DocId, NewFDI} | RestIdFDIs],
+ NewReplies = [{ok, RemovedRevs} | Replies],
+ apply_purge_reqs(RestReqs, NewIdFDIs, NewUSeq, NewReplies).
+
+
commit_data(Db) ->
commit_data(Db, false).
@@ -721,15 +751,6 @@ pair_write_info(Old, New) ->
end, New).
-pair_purge_info(Old, New) ->
- lists:map(fun(OldFDI) ->
- case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
- #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
- false -> {OldFDI, not_found}
- end
- end, Old).
-
-
get_meta_body_size(Meta) ->
{ejson_size, ExternalSize} = lists:keyfind(ejson_size, 1, Meta),
ExternalSize.
diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl
index 99b1192..81209d9 100644
--- a/src/couch/src/couch_httpd_db.erl
+++ b/src/couch/src/couch_httpd_db.erl
@@ -376,17 +376,22 @@ db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");
db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
+ couch_stats:increment_counter([couchdb, httpd, purge_requests]),
couch_httpd:validate_ctype(Req, "application/json"),
- {IdsRevs} = couch_httpd:json_body_obj(Req),
- IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
+ {IdRevs} = couch_httpd:json_body_obj(Req),
+ PurgeReqs = lists:map(fun({Id, JsonRevs}) ->
+ {couch_uuids:new(), Id, couch_doc:parse_revs(JsonRevs)}
+ end, IdRevs),
- case couch_db:purge_docs(Db, IdsRevs2) of
- {ok, PurgeSeq, PurgedIdsRevs} ->
- PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs],
- send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]});
- Error ->
- throw(Error)
- end;
+ {ok, Replies} = couch_db:purge_docs(Db, PurgeReqs),
+
+ Results = lists:zipwith(fun({Id, _}, {ok, Reply}) ->
+ {Id, couch_doc:revs_to_strs(Reply)}
+ end, IdRevs, Replies),
+
+ {ok, Db2} = couch_db:reopen(Db),
+ PurgeSeq = couch_db:get_purge_seq(Db2),
+ send_json(Req, 200, {[{purge_seq, PurgeSeq}, {purged, {Results}}]});
db_req(#httpd{path_parts=[_,<<"_purge">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");