You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/10/18 21:32:40 UTC

[couchdb] branch prototype/fdb-layer-parallel-view-builds created (now 3ad9a4d)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch prototype/fdb-layer-parallel-view-builds
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at 3ad9a4d  Parallelize view builds

This branch includes the following new commits:

     new 3ad9a4d  Parallelize view builds

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: Parallelize view builds

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer-parallel-view-builds
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3ad9a4ded6db786bc5ccfdbd77862c4bb69ccfdb
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Oct 18 16:32:14 2019 -0500

    Parallelize view builds
---
 src/couch_views/src/couch_views_indexer.erl | 293 +++++++++++++++++-----------
 1 file changed, 177 insertions(+), 116 deletions(-)

diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 55ce063..9f6cabe 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -42,7 +42,16 @@ init() ->
         <<"sig">> := JobSig
     } = Data,
 
-    {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+    {ok, Db} = try
+        fabric2_db:open(DbName, [?ADMIN_CTX])
+    catch error:database_does_not_exist ->
+        couch_jobs:finish(undefined, Job, Data#{
+            error => database_does_not_exist,
+            reason => <<"Database was deleted">>
+        }),
+        exit(normal)
+    end,
+
     {ok, DDoc} = fabric2_db:open_doc(Db, DDocId),
     {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
     HexSig = fabric2_util:to_hex(Mrst#mrst.sig),
@@ -57,92 +66,105 @@ init() ->
 
     State = #{
         tx_db => undefined,
-        db_seq => undefined,
         view_seq => undefined,
         last_seq => undefined,
         job => Job,
         job_data => Data,
         count => 0,
         limit => num_changes(),
-        doc_acc => [],
+        batch_size => batch_size(),
+        workers => [],
         design_opts => Mrst#mrst.design_opts
     },
 
     update(Db, Mrst, State).
 
 
-update(#{} = Db, Mrst0, State0) ->
-    {Mrst2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+update(#{} = Db, MrSt, State0) ->
+    State2 = fabric2_fdb:transactional(Db, fun(TxDb1) ->
         % In the first iteration of update we need
         % to populate our db and view sequences
         State1 = case State0 of
-            #{db_seq := undefined} ->
-                ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst0),
+            #{view_seq := undefined} ->
+                ViewSeq = couch_views_fdb:get_update_seq(TxDb1, MrSt),
                 State0#{
-                    tx_db := TxDb,
-                    db_seq := fabric2_db:get_update_seq(TxDb),
+                    tx_db := TxDb1,
                     view_seq := ViewSeq,
                     last_seq := ViewSeq
                 };
             _ ->
-                State0#{
-                    tx_db := TxDb
-                }
+                State0
         end,
 
-        {ok, State2} = fold_changes(State1),
+        fold_changes(State1, MrSt)
+    end),
+
+    #{
+        last_seq := LastSeq,
+        count := Count,
+        limit := Limit,
+        workers := Workers
+    } = State2,
+
+    % Bit odd to be starting with the newest
+    % worker first here but I think it's fine
+    % for now since we're collecting all updates
+    % into a single write transaction
+    Changes = lists:foldl(fun({WPid, WRef}, Acc) ->
+        receive
+            {'DOWN', WRef, process, WPid, {ok, NewChanges}} ->
+                NewChanges ++ Acc;
+            {'DOWN', WRef, process, WPid, Reason} ->
+                exit({worker_update_failed, Reason})
+        after 6000 ->
+            erlang:error("Timeout waiting for worker: ~p", [WPid])
+        end
+    end, [], Workers),
 
-        #{
-            count := Count,
-            limit := Limit,
-            doc_acc := DocAcc,
-            last_seq := LastSeq
-        } = State2,
+    State4 = fabric2_fdb:transactional(Db, fun(TxDb2) ->
 
-        DocAcc1 = fetch_docs(TxDb, DocAcc),
-        {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
-        write_docs(TxDb, Mrst1, MappedDocs, State2),
+        write_changes(TxDb2, MrSt, LastSeq, Changes),
 
         case Count < Limit of
             true ->
-                report_progress(State2, finished),
-                {Mrst1, finished};
+                report_progress(State2#{tx_db := TxDb2}, finished),
+                finished;
             false ->
-                State3 = report_progress(State2, update),
-                {Mrst1, State3#{
+                State3 = report_progress(State2#{tx_db := TxDb2}, update),
+                State3#{
                     tx_db := undefined,
+                    view_seq := LastSeq,
                     count := 0,
-                    doc_acc := [],
-                    view_seq := LastSeq
-                }}
+                    workers := []
+                }
         end
     end),
 
-    case State4 of
-        finished ->
-            couch_eval:release_map_context(Mrst2#mrst.qserver);
-        _ ->
-            update(Db, Mrst2, State4)
+    if State4 == finished -> ok; true ->
+        update(Db, MrSt, State4)
     end.
 
 
-fold_changes(State) ->
+fold_changes(State0, MrSt) ->
     #{
-        view_seq := SinceSeq,
-        limit := Limit,
-        tx_db := TxDb
-    } = State,
+        tx_db := TxDb,
+        view_seq := Seq,
+        limit := Limit
+    } = State0,
 
     Fun = fun process_changes/2,
-    fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
+    Acc = {State0, MrSt, []},
+    Opts = [{limit, Limit}],
+    {ok, AccOut} = fabric2_db:fold_changes(TxDb, Seq, Fun, Acc, Opts),
+    spawn_worker(AccOut).
 
 
-process_changes(Change, Acc) ->
+process_changes(Change, {State0, MrSt, Changes0}) ->
     #{
-        doc_acc := DocAcc,
         count := Count,
+        batch_size := BatchSize,
         design_opts := DesignOpts
-    } = Acc,
+    } = State0,
 
     #{
         id := Id,
@@ -151,86 +173,65 @@ process_changes(Change, Acc) ->
 
     IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts),
 
-    Acc1 = case {Id, IncludeDesign} of
+    Changes1 = case {Id, IncludeDesign} of
         {<<?DESIGN_DOC_PREFIX, _/binary>>, false} ->
-            maps:merge(Acc, #{
-                count => Count + 1,
-                last_seq => LastSeq
-            });
+            Changes0;
         _ ->
-            Acc#{
-                doc_acc := DocAcc ++ [Change],
-                count := Count + 1,
-                last_seq := LastSeq
-            }
+            [Change | Changes0]
     end,
-    {ok, Acc1}.
-
-
-map_docs(Mrst, Docs) ->
-    % Run all the non deleted docs through the view engine and
-    Mrst1 = start_query_server(Mrst),
-    QServer = Mrst1#mrst.qserver,
-
-    {Deleted0, NotDeleted0} = lists:partition(fun(Doc) ->
-        #{deleted := Deleted} = Doc,
-        Deleted
-    end, Docs),
-
-    Deleted1 = lists:map(fun(Doc) ->
-        Doc#{results => []}
-    end, Deleted0),
 
-    DocsToMap = lists:map(fun(Doc) ->
-        #{doc := DocRec} = Doc,
-        DocRec
-    end, NotDeleted0),
-
-    {ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap),
-
-    % The expanded function head here is making an assertion
-    % that the results match the given doc
-    NotDeleted1 = lists:zipwith(fun(#{id := DocId} = Doc, {DocId, Results}) ->
-        Doc#{results => Results}
-    end, NotDeleted0, AllResults),
-
-    % I'm being a bit careful here resorting the docs
-    % in order of the changes feed. Theoretically this is
-    % unnecessary since we're inside a single transaction.
-    % However, I'm concerned if we ever split this up
-    % into multiple transactions that this detail might
-    % be important but forgotten.
-    MappedDocs = lists:sort(fun(A, B) ->
-        #{sequence := ASeq} = A,
-        #{sequence := BSeq} = B,
-        ASeq =< BSeq
-    end, Deleted1 ++ NotDeleted1),
+    State1 = State0#{
+        count := Count + 1,
+        last_seq := LastSeq
+    },
 
-    {Mrst1, MappedDocs}.
+    case length(Changes1) < BatchSize of
+        true ->
+            {ok, {State1, MrSt, Changes1}};
+        false ->
+            State2 = spawn_worker({State1, MrSt, Changes1}),
+            {ok, {State2, MrSt, []}}
+    end.
 
 
-write_docs(TxDb, Mrst, Docs, State) ->
-    #mrst{
-        views = Views,
-        sig = Sig
-    } = Mrst,
+spawn_worker({State, _MrSt, []}) ->
+    State;
 
+spawn_worker({State, MrSt, Changes}) when length(Changes) > 0 ->
     #{
-        last_seq := LastSeq
+        tx_db := #{tx := Tx} = TxDb,
+        workers := Workers
     } = State,
+    ReadVersion = erlfdb:wait(erlfdb:get_read_version(Tx)),
+    WState = State#{
+        tx_db := TxDb#{tx := {read_version, ReadVersion}},
+        workers := []
+    },
+    Worker = spawn_monitor(fun() ->
+        process_changes(WState, MrSt, Changes)
+    end),
+    State#{
+        workers := [Worker | Workers]
+    }.
 
-    ViewIds = [View#mrview.id_num || View <- Views],
-
-    lists:foreach(fun(Doc) ->
-        couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
-    end, Docs),
 
-    couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+process_changes(State, MrSt, Changes0) ->
+    #{
+        tx_db := #{tx := {read_version, ReadVersion}} = TxDb0
+    } = State,
+    {ok, Db} = application:get_env(fabric, db),
+    exit(erlfdb:transactional(Db, fun(NewTx) ->
+        erlfdb:set_read_version(NewTx, ReadVersion),
+        TxDb1 = TxDb0#{tx := NewTx},
+        Changes1 = fetch_docs(TxDb1, Changes0),
+        Changes2 = map_docs(MrSt, Changes1),
+        {ok, Changes2}
+    end)).
 
 
 fetch_docs(Db, Changes) ->
-    {Deleted, NotDeleted} = lists:partition(fun(Doc) ->
-        #{deleted := Deleted} = Doc,
+    {Deleted, NotDeleted} = lists:partition(fun(Change) ->
+        #{deleted := Deleted} = Change,
         Deleted
     end, Changes),
 
@@ -269,7 +270,66 @@ fetch_docs(Db, Changes) ->
     Deleted ++ ChangesWithDocs.
 
 
-start_query_server(#mrst{qserver = nil} = Mrst) ->
+map_docs(MrSt, Changes) ->
+    % Run all the non deleted docs through the view engine and
+    {ok, QServer} = get_query_server(MrSt),
+
+    {Deleted0, NotDeleted0} = lists:partition(fun(Change) ->
+        #{deleted := Deleted} = Change,
+        Deleted
+    end, Changes),
+
+    Deleted1 = lists:map(fun(Change) ->
+        Change#{
+            results => []
+        }
+    end, Deleted0),
+
+    DocsToMap = lists:map(fun(Change) ->
+        #{doc := DocRec} = Change,
+        DocRec
+    end, NotDeleted0),
+
+    {ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap),
+
+    % The expanded function head here is making an assertion
+    % that the results match the given doc
+    NotDeleted1 = lists:zipwith(fun(#{id := Id} = Change, {Id, Results}) ->
+        Change#{
+            doc := [],
+            results => Results
+        }
+    end, NotDeleted0, AllResults),
+
+    % I'm being a bit careful here resorting the docs
+    % in order of the changes feed. Theoretically this is
+    % unnecessary since we're inside a single transaction.
+    % However, I'm concerned if we ever split this up
+    % into multiple transactions that this detail might
+    % be important but forgotten.
+    lists:sort(fun(A, B) ->
+        #{sequence := ASeq} = A,
+        #{sequence := BSeq} = B,
+        ASeq =< BSeq
+    end, Deleted1 ++ NotDeleted1).
+
+
+write_changes(TxDb, MrSt, LastSeq, Changes) ->
+    #mrst{
+        views = Views,
+        sig = Sig
+    } = MrSt,
+
+    ViewIds = [View#mrview.id_num || View <- Views],
+
+    lists:foreach(fun(Change) ->
+        couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Change)
+    end, Changes),
+
+    couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+
+
+get_query_server(#mrst{qserver = nil} = Mrst) ->
     #mrst{
         db_name = DbName,
         idx_name = DDocId,
@@ -278,18 +338,14 @@ start_query_server(#mrst{qserver = nil} = Mrst) ->
         lib = Lib,
         views = Views
     } = Mrst,
-    {ok, QServer} = couch_eval:acquire_map_context(
+    couch_eval:acquire_map_context(
             DbName,
             DDocId,
             Language,
             Sig,
             Lib,
             [View#mrview.def || View <- Views]
-        ),
-    Mrst#mrst{qserver = QServer};
-
-start_query_server(#mrst{} = Mrst) ->
-    Mrst.
+        ).
 
 
 report_progress(State, UpdateType) ->
@@ -336,4 +392,9 @@ report_progress(State, UpdateType) ->
 
 
 num_changes() ->
-    config:get_integer("couch_views", "change_limit", 100).
+    config:get_integer("couch_views", "change_limit", 1000).
+
+
+batch_size() ->
+    config:get_integer("couch_views", "batch_size", 100).
+