You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/09/09 15:44:12 UTC
[couchdb] branch prototype/fdb-layer updated: Fetch docs in
parallel for view indexing
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/prototype/fdb-layer by this push:
new 2074412 Fetch docs in parallel for view indexing
2074412 is described below
commit 207441269aa95ad9d683af7d87ced82145ff6843
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Mon Sep 9 16:01:41 2019 +0200
Fetch docs in parallel for view indexing
---
src/couch_views/src/couch_views_indexer.erl | 58 ++++++++++++++++++++++-------
1 file changed, 45 insertions(+), 13 deletions(-)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 60c8194..83d1b6a 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -99,7 +99,8 @@ update(#{} = Db, Mrst0, State0) ->
last_seq := LastSeq
} = State2,
- {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc),
+ DocAcc1 = fetch_docs(TxDb, DocAcc),
+ {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
write_docs(TxDb, Mrst1, MappedDocs, State2),
case Count < Limit of
@@ -140,14 +141,12 @@ process_changes(Change, Acc) ->
#{
doc_acc := DocAcc,
count := Count,
- tx_db := TxDb,
design_opts := DesignOpts
} = Acc,
#{
id := Id,
- sequence := LastSeq,
- deleted := Deleted
+ sequence := LastSeq
} = Change,
IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts),
@@ -159,16 +158,8 @@ process_changes(Change, Acc) ->
last_seq => LastSeq
});
_ ->
- % Making a note here that we should make fetching all the docs
- % a parallel fdb operation
- {ok, Doc} = case Deleted of
- true -> {ok, []};
- false -> fabric2_db:open_doc(TxDb, Id)
- end,
-
- Change1 = maps:put(doc, Doc, Change),
Acc#{
- doc_acc := DocAcc ++ [Change1],
+ doc_acc := DocAcc ++ [Change],
count := Count + 1,
last_seq := LastSeq
}
@@ -215,6 +206,47 @@ write_docs(TxDb, Mrst, Docs, State) ->
couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+fetch_docs(Db, Changes) ->
+ {Deleted, NotDeleted} = lists:partition(fun(Doc) ->
+ #{deleted := Deleted} = Doc,
+ Deleted
+ end, Changes),
+
+ RevState = lists:foldl(fun(Change, Acc) ->
+ #{id := Id} = Change,
+ RevFuture = fabric2_fdb:get_winning_revs_future(Db, Id, 1),
+ Acc#{
+ RevFuture => {Id, Change}
+ }
+ end, #{}, NotDeleted),
+
+ RevFutures = maps:keys(RevState),
+ BodyState = lists:foldl(fun(RevFuture, Acc) ->
+ {Id, Change} = maps:get(RevFuture, RevState),
+ Revs = fabric2_fdb:get_winning_revs_wait(Db, RevFuture),
+
+ % I'm assuming that in this changes transaction that the winning
+ % doc body exists since it is listed in the changes feed as not deleted
+ #{winner := true} = RevInfo = lists:last(Revs),
+ BodyFuture = fabric2_fdb:get_doc_body_future(Db, Id, RevInfo),
+ Acc#{
+ BodyFuture => {Id, RevInfo, Change}
+ }
+ end, #{}, erlfdb:wait_for_all(RevFutures)),
+
+ BodyFutures = maps:keys(BodyState),
+ ChangesWithDocs = lists:map(fun (BodyFuture) ->
+ {Id, RevInfo, Change} = maps:get(BodyFuture, BodyState),
+ Doc = fabric2_fdb:get_doc_body_wait(Db, Id, RevInfo, BodyFuture),
+ Change#{doc => Doc}
+ end, erlfdb:wait_for_all(BodyFutures)),
+
+ % This combines the deleted changes with the changes that contain docs
+ % Important to note that this is now unsorted. Which is fine for now
+ % But later could be an issue if we split this across transactions
+ Deleted ++ ChangesWithDocs.
+
+
start_query_server(#mrst{qserver = nil} = Mrst) ->
#mrst{
language = Language,