You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/09/09 15:44:12 UTC

[couchdb] branch prototype/fdb-layer updated: Fetch docs in parallel for view indexing

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/prototype/fdb-layer by this push:
     new 2074412  Fetch docs in parallel for view indexing
2074412 is described below

commit 207441269aa95ad9d683af7d87ced82145ff6843
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Mon Sep 9 16:01:41 2019 +0200

    Fetch docs in parallel for view indexing
---
 src/couch_views/src/couch_views_indexer.erl | 58 ++++++++++++++++++++++-------
 1 file changed, 45 insertions(+), 13 deletions(-)

diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 60c8194..83d1b6a 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -99,7 +99,8 @@ update(#{} = Db, Mrst0, State0) ->
             last_seq := LastSeq
         } = State2,
 
-        {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc),
+        DocAcc1 = fetch_docs(TxDb, DocAcc),
+        {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
         write_docs(TxDb, Mrst1, MappedDocs, State2),
 
         case Count < Limit of
@@ -140,14 +141,12 @@ process_changes(Change, Acc) ->
     #{
         doc_acc := DocAcc,
         count := Count,
-        tx_db := TxDb,
         design_opts := DesignOpts
     } = Acc,
 
     #{
         id := Id,
-        sequence := LastSeq,
-        deleted := Deleted
+        sequence := LastSeq
     } = Change,
 
     IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts),
@@ -159,16 +158,8 @@ process_changes(Change, Acc) ->
                 last_seq => LastSeq
             });
         _ ->
-            % Making a note here that we should make fetching all the docs
-            % a parallel fdb operation
-            {ok, Doc} = case Deleted of
-                true -> {ok, []};
-                false -> fabric2_db:open_doc(TxDb, Id)
-            end,
-
-            Change1 = maps:put(doc, Doc, Change),
             Acc#{
-                doc_acc := DocAcc ++ [Change1],
+                doc_acc := DocAcc ++ [Change],
                 count := Count + 1,
                 last_seq := LastSeq
             }
@@ -215,6 +206,47 @@ write_docs(TxDb, Mrst, Docs, State) ->
     couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
 
 
+fetch_docs(Db, Changes) ->
+    {Deleted, NotDeleted} = lists:partition(fun(Doc) ->
+        #{deleted := Deleted} = Doc,
+        Deleted
+    end, Changes),
+
+    RevState = lists:foldl(fun(Change, Acc) ->
+        #{id := Id} = Change,
+        RevFuture = fabric2_fdb:get_winning_revs_future(Db, Id, 1),
+        Acc#{
+            RevFuture => {Id, Change}
+        }
+    end, #{}, NotDeleted),
+
+    RevFutures = maps:keys(RevState),
+    BodyState = lists:foldl(fun(RevFuture, Acc) ->
+        {Id, Change} = maps:get(RevFuture, RevState),
+        Revs = fabric2_fdb:get_winning_revs_wait(Db, RevFuture),
+
+        % I'm assuming that in this changes transaction that the winning
+        % doc body exists since it is listed in the changes feed as not deleted
+        #{winner := true} = RevInfo = lists:last(Revs),
+        BodyFuture = fabric2_fdb:get_doc_body_future(Db, Id, RevInfo),
+        Acc#{
+            BodyFuture => {Id, RevInfo, Change}
+        }
+    end, #{}, erlfdb:wait_for_all(RevFutures)),
+
+    BodyFutures = maps:keys(BodyState),
+    ChangesWithDocs = lists:map(fun (BodyFuture) ->
+        {Id, RevInfo, Change} = maps:get(BodyFuture, BodyState),
+        Doc = fabric2_fdb:get_doc_body_wait(Db, Id, RevInfo, BodyFuture),
+        Change#{doc => Doc}
+    end, erlfdb:wait_for_all(BodyFutures)),
+
+    % This combines the deleted changes with the changes that contain docs
+    % Important to note that this is now unsorted. Which is fine for now
+    % But later could be an issue if we split this across transactions
+    Deleted ++ ChangesWithDocs.
+
+
 start_query_server(#mrst{qserver = nil} = Mrst) ->
     #mrst{
         language = Language,