You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/05/19 16:56:18 UTC
[couchdb] 01/01: Add batching for _bulk_docs
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch batch-bulk-docs
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 230ce7b372fb5a4cbfcc7b95a24b5ba9f9ed7696
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue May 19 12:52:02 2020 -0400
Add batching for _bulk_docs
* Interactive (regular) _bulk_docs can be split into smaller transactions.
* Non-interactive (replicated) _bulk_docs requests can be updated in batches
with more than one document per transaction.
---
src/fabric/src/fabric2_db.erl | 171 ++++++++++++++++++++++++++++++++++-------
src/fabric/src/fabric2_fdb.erl | 8 ++
2 files changed, 151 insertions(+), 28 deletions(-)
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 8764d4e..fb3a82d 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -156,6 +156,21 @@
-define(RETURN(Term), throw({?MODULE, Term})).
+-define(DEFAULT_UPDATE_DOCS_BATCH_SIZE, 5000000).
+
+
+-record(bacc, {
+ db,
+ docs,
+ is_replicated,
+ batch_size,
+ options,
+ rev_futures,
+ seen,
+ seen_tx,
+ results
+}).
+
create(DbName, Options) ->
case validate_dbname(DbName) of
@@ -861,18 +876,8 @@ update_docs(Db, Docs0, Options) ->
Docs1 = apply_before_doc_update(Db, Docs0, Options),
try
validate_atomic_update(Docs0, lists:member(all_or_nothing, Options)),
- Resps0 = case lists:member(replicated_changes, Options) of
- false ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- update_docs_interactive(TxDb, Docs1, Options)
- end);
- true ->
- lists:map(fun(Doc) ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- update_doc_int(TxDb, Doc, Options)
- end)
- end, Docs1)
- end,
+
+ Resps0 = batch_update_docs(Db, Docs1, Options),
% Notify index builder
fabric2_index:db_updated(name(Db)),
@@ -895,7 +900,7 @@ update_docs(Db, Docs0, Options) ->
Else
end
end, Resps0),
- case lists:member(replicated_changes, Options) of
+ case is_replicated(Options) of
true ->
{ok, lists:flatmap(fun(R) ->
case R of
@@ -1647,9 +1652,8 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
<<?LOCAL_DOC_PREFIX, _/binary>> -> true;
_ -> false
end,
- IsReplicated = lists:member(replicated_changes, Options),
try
- case {IsLocal, IsReplicated} of
+ case {IsLocal, is_replicated(Options)} of
{false, false} -> update_doc_interactive(Db, Doc, Options);
{false, true} -> update_doc_replicated(Db, Doc, Options);
{true, _} -> update_local_doc(Db, Doc, Options)
@@ -1659,17 +1663,115 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
end.
-update_docs_interactive(Db, Docs0, Options) ->
- Docs = tag_docs(Docs0),
- Futures = get_winning_rev_futures(Db, Docs),
- {Result, _} = lists:mapfoldl(fun(Doc, SeenIds) ->
- try
- update_docs_interactive(Db, Doc, Options, Futures, SeenIds)
- catch throw:{?MODULE, Return} ->
- {Return, SeenIds}
+batch_update_docs(Db, Docs, Options) ->
+ BAcc = #bacc{
+ db = Db,
+ docs = Docs,
+ batch_size = get_batch_size(Options),
+ options = Options,
+ rev_futures = #{},
+ seen = [],
+ seen_tx = [],
+ results = []
+ },
+ #bacc{results = Res} = batch_update_docs(BAcc),
+ lists:reverse(Res).
+
+
+batch_update_docs(#bacc{docs = []} = BAcc) ->
+ BAcc;
+
+batch_update_docs(#bacc{db = Db} = BAcc) ->
+ #bacc{
+ db = Db,
+ docs = Docs,
+ options = Options
+ } = BAcc,
+ BAccTx2 = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ BAccTx = BAcc#bacc{db = TxDb},
+ case is_replicated(Options) of
+ true ->
+ batch_update_replicated_tx(BAccTx);
+ false ->
+ Tagged = tag_docs(Docs),
+ RevFutures = get_winning_rev_futures(TxDb, Tagged),
+ BAccTx1 = BAccTx#bacc{
+ docs = Tagged,
+ rev_futures = RevFutures
+ },
+ batch_update_interactive_tx(BAccTx1)
end
- end, [], Docs),
- Result.
+ end),
+ % Clean up after the transaction ends so we can recurse with a clean state
+ maps:map(fun(Tag, {fold_info, _St, Future}) when is_reference(Tag) ->
+ ok = erlfdb:cancel(Future)
+ end, BAccTx2#bacc.rev_futures),
+ BAcc1 = BAccTx2#bacc{
+ db = Db,
+ rev_futures = #{},
+ seen_tx = []
+ },
+ batch_update_docs(BAcc1).
+
+
+batch_update_replicated_tx(#bacc{docs = []} = BAcc) ->
+ BAcc;
+
+batch_update_replicated_tx(#bacc{} = BAcc) ->
+ #bacc{
+ db = TxDb,
+ docs = [Doc | Docs],
+ options = Options,
+ batch_size = MaxSize,
+ seen_tx = SeenTx,
+ results = Results
+ } = BAcc,
+ case lists:member(Doc#doc.id, SeenTx) of
+ true ->
+ % If we already updated this doc in the current transaction, skip
+ % to the next transaction
+ BAcc;
+ false ->
+ Res = update_doc_int(TxDb, Doc, Options),
+ BAcc1 = BAcc#bacc{
+ docs = Docs,
+ results = [Res | Results],
+ seen_tx = [Doc#doc.id | SeenTx]
+ },
+ case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
+ true -> BAcc1;
+ false -> batch_update_replicated_tx(BAcc1)
+ end
+ end.
+
+
+batch_update_interactive_tx(#bacc{docs = []} = BAcc) ->
+ BAcc;
+
+batch_update_interactive_tx(#bacc{} = BAcc) ->
+ #bacc{
+ db = TxDb,
+ docs = [Doc | Docs],
+ options = Options,
+ batch_size = MaxSize,
+ rev_futures = RevFutures,
+ seen = Seen,
+ results = Results
+ } = BAcc,
+ {Res, Seen1} = try
+ update_docs_interactive(TxDb, Doc, Options, RevFutures, Seen)
+ catch throw:{?MODULE, Return} ->
+ {Return, Seen}
+ end,
+ BAcc1 = BAcc#bacc{
+ docs = Docs,
+ results = [Res | Results],
+ seen = Seen1
+ },
+ case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
+ true -> BAcc1;
+ false -> batch_update_interactive_tx(BAcc1)
+ end.
update_docs_interactive(Db, #doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} = Doc,
@@ -2122,9 +2224,8 @@ doc_to_revid(#doc{revs = Revs}) ->
tag_docs([]) ->
[];
tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
- NewDoc = Doc#doc{
- meta = [{ref, make_ref()} | Meta]
- },
+ Meta1 = lists:keystore(ref, 1, Meta, {ref, make_ref()}),
+ NewDoc = Doc#doc{meta = Meta1},
[NewDoc | tag_docs(Rest)].
@@ -2226,3 +2327,17 @@ get_cached_db(#{} = Db, Opts) when is_list(Opts) ->
fabric2_fdb:ensure_current(TxDb)
end)
end.
+
+
+is_replicated(Options) when is_list(Options) ->
+ lists:member(replicated_changes, Options).
+
+
+get_batch_size(Options) ->
+ case fabric2_util:get_value(batch_size, Options) of
+ undefined ->
+ config:get_integer("fabric", "update_docs_batch_size",
+ ?DEFAULT_UPDATE_DOCS_BATCH_SIZE);
+ Val when is_integer(Val) ->
+ Val
+ end.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index f274aa6..5ff1ed0 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -75,6 +75,8 @@
new_versionstamp/1,
+ get_approximate_tx_size/1,
+
debug_cluster/0,
debug_cluster/2
]).
@@ -1159,6 +1161,12 @@ new_versionstamp(Tx) ->
{versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
+get_approximate_tx_size(#{} = TxDb) ->
+ require_transaction(TxDb),
+ #{tx := Tx} = TxDb,
+ erlfdb:get_approximate_size(Tx).
+
+
debug_cluster() ->
debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).