You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/05/19 14:48:36 UTC

[couchdb] 01/01: WIP

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch batch-bulk-docs
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 94ffb280628b0ed0131bb1facf7922388df402a2
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue May 19 01:34:01 2020 -0400

    WIP
---
 src/couch/include/couch_db.hrl |   3 +
 src/fabric/src/fabric2_db.erl  | 175 ++++++++++++++++++++++++++++++-----------
 src/fabric/src/fabric2_fdb.erl |   8 ++
 3 files changed, 140 insertions(+), 46 deletions(-)

diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl
index 830b9bc..89eaca1 100644
--- a/src/couch/include/couch_db.hrl
+++ b/src/couch/include/couch_db.hrl
@@ -116,6 +116,9 @@
 
     deleted = false,
 
+    % Used when processing bulk documents with possible duplicate ids
+    tag = undefined :: undefined | reference(),
+
     % key/value tuple of meta information, provided when using special options:
     % couch_db:open_doc(Db, Id, Options).
     meta = []
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 8764d4e..5e181fe 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -156,6 +156,20 @@
 
 -define(RETURN(Term), throw({?MODULE, Term})).
 
+-define(DEFAULT_UPDATE_DOCS_BATCH_SIZE, 8000000).
+
+
+-record(bacc, {
+    db,
+    docs,
+    is_replicated,
+    batch_size,
+    options,
+    rev_futures,
+    seen,
+    results
+}).
+
 
 create(DbName, Options) ->
     case validate_dbname(DbName) of
@@ -861,19 +875,9 @@ update_docs(Db, Docs0, Options) ->
     Docs1 = apply_before_doc_update(Db, Docs0, Options),
     try
         validate_atomic_update(Docs0, lists:member(all_or_nothing, Options)),
-        Resps0 = case lists:member(replicated_changes, Options) of
-            false ->
-                fabric2_fdb:transactional(Db, fun(TxDb) ->
-                    update_docs_interactive(TxDb, Docs1, Options)
-                end);
-            true ->
-                lists:map(fun(Doc) ->
-                    fabric2_fdb:transactional(Db, fun(TxDb) ->
-                        update_doc_int(TxDb, Doc, Options)
-                    end)
-                end, Docs1)
-        end,
-
+        [couch_log:error("XDDDD ~p~n", [D]) || D<-Docs1],
+        Resps0 = batch_update_docs(Db, Docs1, Options),
+        [couch_log:error("~nXXRRRRX ~p~n", [R]) || R<-Resps0],
         % Notify index builder
         fabric2_index:db_updated(name(Db)),
 
@@ -895,7 +899,7 @@ update_docs(Db, Docs0, Options) ->
                     Else
             end
         end, Resps0),
-        case lists:member(replicated_changes, Options) of
+        case is_replicated(Options) of
             true ->
                 {ok, lists:flatmap(fun(R) ->
                     case R of
@@ -1633,12 +1637,8 @@ find_possible_ancestors(RevInfos, MissingRevs) ->
 
 
 apply_before_doc_update(Db, Docs, Options) ->
-    UpdateType = case lists:member(replicated_changes, Options) of
-        true -> replicated_changes;
-        false -> interactive_edit
-    end,
     lists:map(fun(Doc) ->
-        fabric2_db_plugin:before_doc_update(Db, Doc, UpdateType)
+        fabric2_db_plugin:before_doc_update(Db, Doc, is_replicated(Options))
     end, Docs).
 
 
@@ -1647,9 +1647,8 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
         <<?LOCAL_DOC_PREFIX, _/binary>> -> true;
         _ -> false
     end,
-    IsReplicated = lists:member(replicated_changes, Options),
     try
-        case {IsLocal, IsReplicated} of
+        case {IsLocal, is_replicated(Options)} of
             {false, false} -> update_doc_interactive(Db, Doc, Options);
             {false, true} -> update_doc_replicated(Db, Doc, Options);
             {true, _} -> update_local_doc(Db, Doc, Options)
@@ -1659,17 +1658,100 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
     end.
 
 
-update_docs_interactive(Db, Docs0, Options) ->
-    Docs = tag_docs(Docs0),
-    Futures = get_winning_rev_futures(Db, Docs),
-    {Result, _} = lists:mapfoldl(fun(Doc, SeenIds) ->
-        try
-            update_docs_interactive(Db, Doc, Options, Futures, SeenIds)
-        catch throw:{?MODULE, Return} ->
-            {Return, SeenIds}
+batch_update_docs(Db, Docs, Options) ->
+    BAcc = #bacc{
+        db = Db,
+        docs = Docs,
+        batch_size = get_batch_size(Options),
+        options = Options,
+        seen = [],
+        results = []
+    },
+    #bacc{results = Res} = batch_update_docs(BAcc),
+    lists:reverse(Res).
+
+
+batch_update_docs(#bacc{docs = []} = BAcc) ->
+    BAcc;
+
+batch_update_docs(#bacc{db = Db} = BAcc) ->
+    #bacc{
+        db = Db,
+        docs = Docs,
+        options = Options
+    } = BAcc,
+    BAcc1 = fabric2_fdb:transactional(Db, fun(TxDb) ->
+        case is_replicated(Options) of
+            true ->
+                BAcc1 = BAcc#bacc{
+                    db = TxDb
+                },
+                batch_update_replicated_tx(BAcc1);
+            false ->
+                Tagged = lists:map(fun(#doc{} = Doc) ->
+                    Doc#doc{tag = make_ref()}
+                end, Docs),
+                RevFutures = get_winning_rev_futures(TxDb, Tagged),
+                BAcc1 = BAcc#bacc{
+                    db = TxDb,
+                    docs = Tagged,
+                    rev_futures = RevFutures
+                },
+                batch_update_interactive_tx(BAcc1)
         end
-    end, [], Docs),
-    Result.
+    end),
+    batch_update_docs(BAcc1#bacc{db = Db}).
+
+
+batch_update_replicated_tx(#bacc{docs = []} = BAcc) ->
+    BAcc;
+
+batch_update_replicated_tx(#bacc{} = BAcc) ->
+    #bacc{
+        db = TxDb,
+        docs = [Doc | Docs],
+        options = Options,
+        batch_size = MaxSize,
+        results = Results
+    } = BAcc,
+    Res = update_doc_int(TxDb, Doc, Options),
+    BAcc1 = BAcc#bacc{
+        docs = Docs,
+        results = [Res | Results]
+    },
+    case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
+        true -> BAcc1;
+        false -> batch_update_replicated_tx(BAcc1)
+    end.
+
+
+batch_update_interactive_tx(#bacc{docs = []} = BAcc) ->
+    BAcc;
+
+batch_update_interactive_tx(#bacc{} = BAcc) ->
+    #bacc{
+        db = TxDb,
+        docs = [Doc | Docs],
+        options = Options,
+        batch_size = MaxSize,
+        rev_futures = RevFutures,
+        seen = Seen,
+        results = Results
+    } = BAcc,
+    {Res, Seen1} = try
+        update_docs_interactive(TxDb, Doc, Options, RevFutures, Seen)
+    catch throw:{?MODULE, Return} ->
+        {Return, Seen}
+    end,
+    BAcc1 = BAcc#bacc{
+        docs = Docs,
+        results = [Res | Results],
+        seen = Seen1
+    },
+    case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
+        true -> BAcc1;
+        false -> batch_update_interactive_tx(BAcc1)
+    end.
 
 
 update_docs_interactive(Db, #doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} = Doc,
@@ -1681,7 +1763,7 @@ update_docs_interactive(Db, Doc, Options, Futures, SeenIds) ->
         true ->
             {conflict, SeenIds};
         false ->
-            Future = maps:get(doc_tag(Doc), Futures),
+            Future = maps:get(Doc#doc.tag, Futures),
             case update_doc_interactive(Db, Doc, Future, Options) of
                 {ok, _} = Resp ->
                     {Resp, [Doc#doc.id | SeenIds]};
@@ -1958,7 +2040,7 @@ get_winning_rev_futures(Db, Docs) ->
         if IsLocal -> Acc; true ->
             NumRevs = if Deleted -> 2; true -> 1 end,
             Future = fabric2_fdb:get_winning_revs_future(Db, DocId, NumRevs),
-            DocTag = doc_tag(Doc),
+            DocTag = Doc#doc.tag,
             Acc#{DocTag => Future}
         end
     end, #{}, Docs).
@@ -2119,19 +2201,6 @@ doc_to_revid(#doc{revs = Revs}) ->
     end.
 
 
-tag_docs([]) ->
-    [];
-tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
-    NewDoc = Doc#doc{
-        meta = [{ref, make_ref()} | Meta]
-    },
-    [NewDoc | tag_docs(Rest)].
-
-
-doc_tag(#doc{meta = Meta}) ->
-    fabric2_util:get_value(ref, Meta).
-
-
 idrevs({Id, Revs}) when is_list(Revs) ->
     {docid(Id), [rev(R) || R <- Revs]}.
 
@@ -2226,3 +2295,17 @@ get_cached_db(#{} = Db, Opts) when is_list(Opts) ->
                 fabric2_fdb:ensure_current(TxDb)
             end)
     end.
+
+
+is_replicated(Options) when is_list(Options) ->
+    lists:member(replicated_changes, Options).
+
+
+get_batch_size(Options) ->
+    case fabric2_util:get_value(batch_size, Options) of
+        undefined ->
+            config:get_integer("fabric", "update_docs_batch_size",
+                ?DEFAULT_UPDATE_DOCS_BATCH_SIZE);
+        Val when is_integer(Val) ->
+            Val
+    end.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index f274aa6..5ff1ed0 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -75,6 +75,8 @@
 
     new_versionstamp/1,
 
+    get_approximate_tx_size/1,
+
     debug_cluster/0,
     debug_cluster/2
 ]).
@@ -1159,6 +1161,12 @@ new_versionstamp(Tx) ->
     {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
 
 
+get_approximate_tx_size(#{} = TxDb) ->
+    require_transaction(TxDb),
+    #{tx := Tx} = TxDb,
+    erlfdb:get_approximate_size(Tx).
+
+
 debug_cluster() ->
     debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).