You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/10/18 21:32:40 UTC
[couchdb] branch prototype/fdb-layer-parallel-view-builds created
(now 3ad9a4d)
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a change to branch prototype/fdb-layer-parallel-view-builds
in repository https://gitbox.apache.org/repos/asf/couchdb.git.
at 3ad9a4d Parallelize view builds
This branch includes the following new commits:
new 3ad9a4d Parallelize view builds
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[couchdb] 01/01: Parallelize view builds
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch prototype/fdb-layer-parallel-view-builds
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 3ad9a4ded6db786bc5ccfdbd77862c4bb69ccfdb
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Oct 18 16:32:14 2019 -0500
Parallelize view builds
---
src/couch_views/src/couch_views_indexer.erl | 293 +++++++++++++++++-----------
1 file changed, 177 insertions(+), 116 deletions(-)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 55ce063..9f6cabe 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -42,7 +42,16 @@ init() ->
<<"sig">> := JobSig
} = Data,
- {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ {ok, Db} = try
+ fabric2_db:open(DbName, [?ADMIN_CTX])
+ catch error:database_does_not_exist ->
+ couch_jobs:finish(undefined, Job, Data#{
+ error => database_does_not_exist,
+ reason => <<"Database was deleted">>
+ }),
+ exit(normal)
+ end,
+
{ok, DDoc} = fabric2_db:open_doc(Db, DDocId),
{ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
HexSig = fabric2_util:to_hex(Mrst#mrst.sig),
@@ -57,92 +66,105 @@ init() ->
State = #{
tx_db => undefined,
- db_seq => undefined,
view_seq => undefined,
last_seq => undefined,
job => Job,
job_data => Data,
count => 0,
limit => num_changes(),
- doc_acc => [],
+ batch_size => batch_size(),
+ workers => [],
design_opts => Mrst#mrst.design_opts
},
update(Db, Mrst, State).
-update(#{} = Db, Mrst0, State0) ->
- {Mrst2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+update(#{} = Db, MrSt, State0) ->
+ State2 = fabric2_fdb:transactional(Db, fun(TxDb1) ->
% In the first iteration of update we need
% to populate our db and view sequences
State1 = case State0 of
- #{db_seq := undefined} ->
- ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst0),
+ #{view_seq := undefined} ->
+ ViewSeq = couch_views_fdb:get_update_seq(TxDb1, MrSt),
State0#{
- tx_db := TxDb,
- db_seq := fabric2_db:get_update_seq(TxDb),
+ tx_db := TxDb1,
view_seq := ViewSeq,
last_seq := ViewSeq
};
_ ->
- State0#{
- tx_db := TxDb
- }
+ State0
end,
- {ok, State2} = fold_changes(State1),
+ fold_changes(State1, MrSt)
+ end),
+
+ #{
+ last_seq := LastSeq,
+ count := Count,
+ limit := Limit,
+ workers := Workers
+ } = State2,
+
+ % Bit odd to be starting with the newest
+ % worker first here but I think it's fine
+ % for now since we're collecting all updates
+ % into a single write transaction
+ Changes = lists:foldl(fun({WPid, WRef}, Acc) ->
+ receive
+ {'DOWN', WRef, process, WPid, {ok, NewChanges}} ->
+ NewChanges ++ Acc;
+ {'DOWN', WRef, process, WPid, Reason} ->
+ exit({worker_update_failed, Reason})
+ after 6000 ->
+ erlang:error("Timeout waiting for worker: ~p", [WPid])
+ end
+ end, [], Workers),
- #{
- count := Count,
- limit := Limit,
- doc_acc := DocAcc,
- last_seq := LastSeq
- } = State2,
+ State4 = fabric2_fdb:transactional(Db, fun(TxDb2) ->
- DocAcc1 = fetch_docs(TxDb, DocAcc),
- {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
- write_docs(TxDb, Mrst1, MappedDocs, State2),
+ write_changes(TxDb2, MrSt, LastSeq, Changes),
case Count < Limit of
true ->
- report_progress(State2, finished),
- {Mrst1, finished};
+ report_progress(State2#{tx_db := TxDb2}, finished),
+ finished;
false ->
- State3 = report_progress(State2, update),
- {Mrst1, State3#{
+ State3 = report_progress(State2#{tx_db := TxDb2}, update),
+ State3#{
tx_db := undefined,
+ view_seq := LastSeq,
count := 0,
- doc_acc := [],
- view_seq := LastSeq
- }}
+ workers := []
+ }
end
end),
- case State4 of
- finished ->
- couch_eval:release_map_context(Mrst2#mrst.qserver);
- _ ->
- update(Db, Mrst2, State4)
+ if State4 == finished -> ok; true ->
+ update(Db, MrSt, State4)
end.
-fold_changes(State) ->
+fold_changes(State0, MrSt) ->
#{
- view_seq := SinceSeq,
- limit := Limit,
- tx_db := TxDb
- } = State,
+ tx_db := TxDb,
+ view_seq := Seq,
+ limit := Limit
+ } = State0,
Fun = fun process_changes/2,
- fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
+ Acc = {State0, MrSt, []},
+ Opts = [{limit, Limit}],
+ {ok, AccOut} = fabric2_db:fold_changes(TxDb, Seq, Fun, Acc, Opts),
+ spawn_worker(AccOut).
-process_changes(Change, Acc) ->
+process_changes(Change, {State0, MrSt, Changes0}) ->
#{
- doc_acc := DocAcc,
count := Count,
+ batch_size := BatchSize,
design_opts := DesignOpts
- } = Acc,
+ } = State0,
#{
id := Id,
@@ -151,86 +173,65 @@ process_changes(Change, Acc) ->
IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts),
- Acc1 = case {Id, IncludeDesign} of
+ Changes1 = case {Id, IncludeDesign} of
{<<?DESIGN_DOC_PREFIX, _/binary>>, false} ->
- maps:merge(Acc, #{
- count => Count + 1,
- last_seq => LastSeq
- });
+ Changes0;
_ ->
- Acc#{
- doc_acc := DocAcc ++ [Change],
- count := Count + 1,
- last_seq := LastSeq
- }
+ [Change | Changes0]
end,
- {ok, Acc1}.
-
-
-map_docs(Mrst, Docs) ->
- % Run all the non deleted docs through the view engine and
- Mrst1 = start_query_server(Mrst),
- QServer = Mrst1#mrst.qserver,
-
- {Deleted0, NotDeleted0} = lists:partition(fun(Doc) ->
- #{deleted := Deleted} = Doc,
- Deleted
- end, Docs),
-
- Deleted1 = lists:map(fun(Doc) ->
- Doc#{results => []}
- end, Deleted0),
- DocsToMap = lists:map(fun(Doc) ->
- #{doc := DocRec} = Doc,
- DocRec
- end, NotDeleted0),
-
- {ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap),
-
- % The expanded function head here is making an assertion
- % that the results match the given doc
- NotDeleted1 = lists:zipwith(fun(#{id := DocId} = Doc, {DocId, Results}) ->
- Doc#{results => Results}
- end, NotDeleted0, AllResults),
-
- % I'm being a bit careful here resorting the docs
- % in order of the changes feed. Theoretically this is
- % unnecessary since we're inside a single transaction.
- % However, I'm concerned if we ever split this up
- % into multiple transactions that this detail might
- % be important but forgotten.
- MappedDocs = lists:sort(fun(A, B) ->
- #{sequence := ASeq} = A,
- #{sequence := BSeq} = B,
- ASeq =< BSeq
- end, Deleted1 ++ NotDeleted1),
+ State1 = State0#{
+ count := Count + 1,
+ last_seq := LastSeq
+ },
- {Mrst1, MappedDocs}.
+ case length(Changes1) < BatchSize of
+ true ->
+ {ok, {State1, MrSt, Changes1}};
+ false ->
+ State2 = spawn_worker({State1, MrSt, Changes1}),
+ {ok, {State2, MrSt, []}}
+ end.
-write_docs(TxDb, Mrst, Docs, State) ->
- #mrst{
- views = Views,
- sig = Sig
- } = Mrst,
+spawn_worker({State, _MrSt, []}) ->
+ State;
+spawn_worker({State, MrSt, Changes}) when length(Changes) > 0 ->
#{
- last_seq := LastSeq
+ tx_db := #{tx := Tx} = TxDb,
+ workers := Workers
} = State,
+ ReadVersion = erlfdb:wait(erlfdb:get_read_version(Tx)),
+ WState = State#{
+ tx_db := TxDb#{tx := {read_version, ReadVersion}},
+ workers := []
+ },
+ Worker = spawn_monitor(fun() ->
+ process_changes(WState, MrSt, Changes)
+ end),
+ State#{
+ workers := [Worker | Workers]
+ }.
- ViewIds = [View#mrview.id_num || View <- Views],
-
- lists:foreach(fun(Doc) ->
- couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
- end, Docs),
- couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+process_changes(State, MrSt, Changes0) ->
+ #{
+ tx_db := #{tx := {read_version, ReadVersion}} = TxDb0
+ } = State,
+ {ok, Db} = application:get_env(fabric, db),
+ exit(erlfdb:transactional(Db, fun(NewTx) ->
+ erlfdb:set_read_version(NewTx, ReadVersion),
+ TxDb1 = TxDb0#{tx := NewTx},
+ Changes1 = fetch_docs(TxDb1, Changes0),
+ Changes2 = map_docs(MrSt, Changes1),
+ {ok, Changes2}
+ end)).
fetch_docs(Db, Changes) ->
- {Deleted, NotDeleted} = lists:partition(fun(Doc) ->
- #{deleted := Deleted} = Doc,
+ {Deleted, NotDeleted} = lists:partition(fun(Change) ->
+ #{deleted := Deleted} = Change,
Deleted
end, Changes),
@@ -269,7 +270,66 @@ fetch_docs(Db, Changes) ->
Deleted ++ ChangesWithDocs.
-start_query_server(#mrst{qserver = nil} = Mrst) ->
+map_docs(MrSt, Changes) ->
+ % Run all the non deleted docs through the view engine and
+ {ok, QServer} = get_query_server(MrSt),
+
+ {Deleted0, NotDeleted0} = lists:partition(fun(Change) ->
+ #{deleted := Deleted} = Change,
+ Deleted
+ end, Changes),
+
+ Deleted1 = lists:map(fun(Change) ->
+ Change#{
+ results => []
+ }
+ end, Deleted0),
+
+ DocsToMap = lists:map(fun(Change) ->
+ #{doc := DocRec} = Change,
+ DocRec
+ end, NotDeleted0),
+
+ {ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap),
+
+ % The expanded function head here is making an assertion
+ % that the results match the given doc
+ NotDeleted1 = lists:zipwith(fun(#{id := Id} = Change, {Id, Results}) ->
+ Change#{
+ doc := [],
+ results => Results
+ }
+ end, NotDeleted0, AllResults),
+
+ % I'm being a bit careful here resorting the docs
+ % in order of the changes feed. Theoretically this is
+ % unnecessary since we're inside a single transaction.
+ % However, I'm concerned if we ever split this up
+ % into multiple transactions that this detail might
+ % be important but forgotten.
+ lists:sort(fun(A, B) ->
+ #{sequence := ASeq} = A,
+ #{sequence := BSeq} = B,
+ ASeq =< BSeq
+ end, Deleted1 ++ NotDeleted1).
+
+
+write_changes(TxDb, MrSt, LastSeq, Changes) ->
+ #mrst{
+ views = Views,
+ sig = Sig
+ } = MrSt,
+
+ ViewIds = [View#mrview.id_num || View <- Views],
+
+ lists:foreach(fun(Change) ->
+ couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Change)
+ end, Changes),
+
+ couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+
+
+get_query_server(#mrst{qserver = nil} = Mrst) ->
#mrst{
db_name = DbName,
idx_name = DDocId,
@@ -278,18 +338,14 @@ start_query_server(#mrst{qserver = nil} = Mrst) ->
lib = Lib,
views = Views
} = Mrst,
- {ok, QServer} = couch_eval:acquire_map_context(
+ couch_eval:acquire_map_context(
DbName,
DDocId,
Language,
Sig,
Lib,
[View#mrview.def || View <- Views]
- ),
- Mrst#mrst{qserver = QServer};
-
-start_query_server(#mrst{} = Mrst) ->
- Mrst.
+ ).
report_progress(State, UpdateType) ->
@@ -336,4 +392,9 @@ report_progress(State, UpdateType) ->
num_changes() ->
- config:get_integer("couch_views", "change_limit", 100).
+ config:get_integer("couch_views", "change_limit", 1000).
+
+
+batch_size() ->
+ config:get_integer("couch_views", "batch_size", 100).
+