You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/05/19 16:56:18 UTC

[couchdb] 01/01: Add batching for _bulk_docs

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch batch-bulk-docs
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 230ce7b372fb5a4cbfcc7b95a24b5ba9f9ed7696
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue May 19 12:52:02 2020 -0400

    Add batching for _bulk_docs
    
     * Interactive (regular) _bulk_docs can be split into smaller transactions.
    
     * Non-interactive (replicated) _bulk_docs requests can be updated in batches
       with more than one document per transaction.
---
 src/fabric/src/fabric2_db.erl  | 171 ++++++++++++++++++++++++++++++++++-------
 src/fabric/src/fabric2_fdb.erl |   8 ++
 2 files changed, 151 insertions(+), 28 deletions(-)

diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 8764d4e..fb3a82d 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -156,6 +156,21 @@
 
 -define(RETURN(Term), throw({?MODULE, Term})).
 
+-define(DEFAULT_UPDATE_DOCS_BATCH_SIZE, 5000000).
+
+
+-record(bacc, {
+    db,
+    docs,
+    is_replicated,
+    batch_size,
+    options,
+    rev_futures,
+    seen,
+    seen_tx,
+    results
+}).
+
 
 create(DbName, Options) ->
     case validate_dbname(DbName) of
@@ -861,18 +876,8 @@ update_docs(Db, Docs0, Options) ->
     Docs1 = apply_before_doc_update(Db, Docs0, Options),
     try
         validate_atomic_update(Docs0, lists:member(all_or_nothing, Options)),
-        Resps0 = case lists:member(replicated_changes, Options) of
-            false ->
-                fabric2_fdb:transactional(Db, fun(TxDb) ->
-                    update_docs_interactive(TxDb, Docs1, Options)
-                end);
-            true ->
-                lists:map(fun(Doc) ->
-                    fabric2_fdb:transactional(Db, fun(TxDb) ->
-                        update_doc_int(TxDb, Doc, Options)
-                    end)
-                end, Docs1)
-        end,
+
+        Resps0 = batch_update_docs(Db, Docs1, Options),
 
         % Notify index builder
         fabric2_index:db_updated(name(Db)),
@@ -895,7 +900,7 @@ update_docs(Db, Docs0, Options) ->
                     Else
             end
         end, Resps0),
-        case lists:member(replicated_changes, Options) of
+        case is_replicated(Options) of
             true ->
                 {ok, lists:flatmap(fun(R) ->
                     case R of
@@ -1647,9 +1652,8 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
         <<?LOCAL_DOC_PREFIX, _/binary>> -> true;
         _ -> false
     end,
-    IsReplicated = lists:member(replicated_changes, Options),
     try
-        case {IsLocal, IsReplicated} of
+        case {IsLocal, is_replicated(Options)} of
             {false, false} -> update_doc_interactive(Db, Doc, Options);
             {false, true} -> update_doc_replicated(Db, Doc, Options);
             {true, _} -> update_local_doc(Db, Doc, Options)
@@ -1659,17 +1663,115 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
     end.
 
 
-update_docs_interactive(Db, Docs0, Options) ->
-    Docs = tag_docs(Docs0),
-    Futures = get_winning_rev_futures(Db, Docs),
-    {Result, _} = lists:mapfoldl(fun(Doc, SeenIds) ->
-        try
-            update_docs_interactive(Db, Doc, Options, Futures, SeenIds)
-        catch throw:{?MODULE, Return} ->
-            {Return, SeenIds}
+batch_update_docs(Db, Docs, Options) ->
+    BAcc = #bacc{
+        db = Db,
+        docs = Docs,
+        batch_size = get_batch_size(Options),
+        options = Options,
+        rev_futures = #{},
+        seen = [],
+        seen_tx = [],
+        results = []
+    },
+    #bacc{results = Res} = batch_update_docs(BAcc),
+    lists:reverse(Res).
+
+
+batch_update_docs(#bacc{docs = []} = BAcc) ->
+    BAcc;
+
+batch_update_docs(#bacc{db = Db} = BAcc) ->
+    #bacc{
+        db = Db,
+        docs = Docs,
+        options = Options
+    } = BAcc,
+    BAccTx2 = fabric2_fdb:transactional(Db, fun(TxDb) ->
+        BAccTx = BAcc#bacc{db = TxDb},
+        case is_replicated(Options) of
+            true ->
+                batch_update_replicated_tx(BAccTx);
+            false ->
+                Tagged = tag_docs(Docs),
+                RevFutures = get_winning_rev_futures(TxDb, Tagged),
+                BAccTx1 = BAccTx#bacc{
+                    docs = Tagged,
+                    rev_futures = RevFutures
+                },
+                batch_update_interactive_tx(BAccTx1)
         end
-    end, [], Docs),
-    Result.
+    end),
+    % Clean up after the transaction ends so we can recurse with a clean state
+    maps:map(fun(Tag, {fold_info, _St, Future}) when is_reference(Tag) ->
+        ok = erlfdb:cancel(Future)
+    end, BAccTx2#bacc.rev_futures),
+    BAcc1 = BAccTx2#bacc{
+        db = Db,
+        rev_futures = #{},
+        seen_tx = []
+    },
+    batch_update_docs(BAcc1).
+
+
+batch_update_replicated_tx(#bacc{docs = []} = BAcc) ->
+    BAcc;
+
+batch_update_replicated_tx(#bacc{} = BAcc) ->
+    #bacc{
+        db = TxDb,
+        docs = [Doc | Docs],
+        options = Options,
+        batch_size = MaxSize,
+        seen_tx = SeenTx,
+        results = Results
+    } = BAcc,
+    case lists:member(Doc#doc.id, SeenTx) of
+        true ->
+            % If we already updated this doc in the current transaction, skip
+            % to the next transaction
+            BAcc;
+        false ->
+            Res = update_doc_int(TxDb, Doc, Options),
+            BAcc1 = BAcc#bacc{
+                docs = Docs,
+                results = [Res | Results],
+                seen_tx = [Doc#doc.id | SeenTx]
+            },
+            case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
+                true -> BAcc1;
+                false -> batch_update_replicated_tx(BAcc1)
+            end
+    end.
+
+
+batch_update_interactive_tx(#bacc{docs = []} = BAcc) ->
+    BAcc;
+
+batch_update_interactive_tx(#bacc{} = BAcc) ->
+    #bacc{
+        db = TxDb,
+        docs = [Doc | Docs],
+        options = Options,
+        batch_size = MaxSize,
+        rev_futures = RevFutures,
+        seen = Seen,
+        results = Results
+    } = BAcc,
+    {Res, Seen1} = try
+        update_docs_interactive(TxDb, Doc, Options, RevFutures, Seen)
+    catch throw:{?MODULE, Return} ->
+        {Return, Seen}
+    end,
+    BAcc1 = BAcc#bacc{
+        docs = Docs,
+        results = [Res | Results],
+        seen = Seen1
+    },
+    case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
+        true -> BAcc1;
+        false -> batch_update_interactive_tx(BAcc1)
+    end.
 
 
 update_docs_interactive(Db, #doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} = Doc,
@@ -2122,9 +2224,8 @@ doc_to_revid(#doc{revs = Revs}) ->
 tag_docs([]) ->
     [];
 tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
-    NewDoc = Doc#doc{
-        meta = [{ref, make_ref()} | Meta]
-    },
+    Meta1 = lists:keystore(ref, 1, Meta, {ref, make_ref()}),
+    NewDoc = Doc#doc{meta = Meta1},
     [NewDoc | tag_docs(Rest)].
 
 
@@ -2226,3 +2327,17 @@ get_cached_db(#{} = Db, Opts) when is_list(Opts) ->
                 fabric2_fdb:ensure_current(TxDb)
             end)
     end.
+
+
+is_replicated(Options) when is_list(Options) ->
+    lists:member(replicated_changes, Options).
+
+
+get_batch_size(Options) ->
+    case fabric2_util:get_value(batch_size, Options) of
+        undefined ->
+            config:get_integer("fabric", "update_docs_batch_size",
+                ?DEFAULT_UPDATE_DOCS_BATCH_SIZE);
+        Val when is_integer(Val) ->
+            Val
+    end.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index f274aa6..5ff1ed0 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -75,6 +75,8 @@
 
     new_versionstamp/1,
 
+    get_approximate_tx_size/1,
+
     debug_cluster/0,
     debug_cluster/2
 ]).
@@ -1159,6 +1161,12 @@ new_versionstamp(Tx) ->
     {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
 
 
+get_approximate_tx_size(#{} = TxDb) ->
+    require_transaction(TxDb),
+    #{tx := Tx} = TxDb,
+    erlfdb:get_approximate_size(Tx).
+
+
 debug_cluster() ->
     debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).