You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/10/18 21:46:00 UTC

[couchdb] branch prototype/fdb-layer-parallel-view-builds updated (3ad9a4d -> 0765786)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch prototype/fdb-layer-parallel-view-builds
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard 3ad9a4d  Parallelize view builds
     new 0765786  Parallelize view builds

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3ad9a4d)
            \
             N -- N -- N   refs/heads/prototype/fdb-layer-parallel-view-builds (0765786)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch_views/src/couch_views_indexer.erl | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)


[couchdb] 01/01: Parallelize view builds

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer-parallel-view-builds
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 07657867889b106a1a639660416ef69b714d1247
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Fri Oct 18 16:32:14 2019 -0500

    Parallelize view builds
---
 src/couch_views/src/couch_views_indexer.erl | 291 +++++++++++++++++-----------
 1 file changed, 177 insertions(+), 114 deletions(-)

diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 55ce063..f0b7117 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -42,7 +42,16 @@ init() ->
         <<"sig">> := JobSig
     } = Data,
 
-    {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+    {ok, Db} = try
+        fabric2_db:open(DbName, [?ADMIN_CTX])
+    catch error:database_does_not_exist ->
+        couch_jobs:finish(undefined, Job, Data#{
+            error => database_does_not_exist,
+            reason => <<"Database was deleted">>
+        }),
+        exit(normal)
+    end,
+
     {ok, DDoc} = fabric2_db:open_doc(Db, DDocId),
     {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
     HexSig = fabric2_util:to_hex(Mrst#mrst.sig),
@@ -57,92 +66,107 @@ init() ->
 
     State = #{
         tx_db => undefined,
-        db_seq => undefined,
         view_seq => undefined,
         last_seq => undefined,
         job => Job,
         job_data => Data,
         count => 0,
         limit => num_changes(),
-        doc_acc => [],
+        batch_size => batch_size(),
+        workers => [],
         design_opts => Mrst#mrst.design_opts
     },
 
     update(Db, Mrst, State).
 
 
-update(#{} = Db, Mrst0, State0) ->
-    {Mrst2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+update(#{} = Db, MrSt, State0) ->
+    State2 = fabric2_fdb:transactional(Db, fun(TxDb1) ->
         % In the first iteration of update we need
         % to populate our db and view sequences
         State1 = case State0 of
-            #{db_seq := undefined} ->
-                ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst0),
+            #{view_seq := undefined} ->
+                ViewSeq = couch_views_fdb:get_update_seq(TxDb1, MrSt),
                 State0#{
-                    tx_db := TxDb,
-                    db_seq := fabric2_db:get_update_seq(TxDb),
+                    tx_db := TxDb1,
                     view_seq := ViewSeq,
                     last_seq := ViewSeq
                 };
             _ ->
                 State0#{
-                    tx_db := TxDb
+                    tx_db := TxDb1
                 }
         end,
 
-        {ok, State2} = fold_changes(State1),
+        fold_changes(State1, MrSt)
+    end),
 
-        #{
-            count := Count,
-            limit := Limit,
-            doc_acc := DocAcc,
-            last_seq := LastSeq
-        } = State2,
+    #{
+        last_seq := LastSeq,
+        count := Count,
+        limit := Limit,
+        workers := Workers
+    } = State2,
+
+    % Bit odd to be starting with the newest
+    % worker first here but I think it's fine
+    % for now since we're collecting all updates
+    % into a single write transaction
+    Changes = lists:foldl(fun({WPid, WRef}, Acc) ->
+        receive
+            {'DOWN', WRef, process, WPid, {ok, NewChanges}} ->
+                NewChanges ++ Acc;
+            {'DOWN', WRef, process, WPid, Reason} ->
+                exit({worker_update_failed, Reason})
+        after 6000 ->
+            erlang:error("Timeout waiting for worker: ~p", [WPid])
+        end
+    end, [], Workers),
 
-        DocAcc1 = fetch_docs(TxDb, DocAcc),
-        {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
-        write_docs(TxDb, Mrst1, MappedDocs, State2),
+    State4 = fabric2_fdb:transactional(Db, fun(TxDb2) ->
+
+        write_changes(TxDb2, MrSt, LastSeq, Changes),
 
         case Count < Limit of
             true ->
-                report_progress(State2, finished),
-                {Mrst1, finished};
+                report_progress(State2#{tx_db := TxDb2}, finished),
+                finished;
             false ->
-                State3 = report_progress(State2, update),
-                {Mrst1, State3#{
+                State3 = report_progress(State2#{tx_db := TxDb2}, update),
+                State3#{
                     tx_db := undefined,
+                    view_seq := LastSeq,
                     count := 0,
-                    doc_acc := [],
-                    view_seq := LastSeq
-                }}
+                    workers := []
+                }
         end
     end),
 
-    case State4 of
-        finished ->
-            couch_eval:release_map_context(Mrst2#mrst.qserver);
-        _ ->
-            update(Db, Mrst2, State4)
+    if State4 == finished -> ok; true ->
+        update(Db, MrSt, State4)
     end.
 
 
-fold_changes(State) ->
+fold_changes(State0, MrSt) ->
     #{
-        view_seq := SinceSeq,
-        limit := Limit,
-        tx_db := TxDb
-    } = State,
+        tx_db := TxDb,
+        view_seq := Seq,
+        limit := Limit
+    } = State0,
 
     Fun = fun process_changes/2,
-    fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
+    Acc = {State0, MrSt, []},
+    Opts = [{limit, Limit}],
+    {ok, AccOut} = fabric2_db:fold_changes(TxDb, Seq, Fun, Acc, Opts),
+    spawn_worker(AccOut).
 
 
-process_changes(Change, Acc) ->
+process_changes(Change, {State0, MrSt, Changes0}) ->
     #{
-        doc_acc := DocAcc,
         count := Count,
+        batch_size := BatchSize,
         design_opts := DesignOpts
-    } = Acc,
+    } = State0,
 
     #{
         id := Id,
@@ -151,86 +175,65 @@ process_changes(Change, Acc) ->
 
     IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts),
 
-    Acc1 = case {Id, IncludeDesign} of
+    Changes1 = case {Id, IncludeDesign} of
         {<<?DESIGN_DOC_PREFIX, _/binary>>, false} ->
-            maps:merge(Acc, #{
-                count => Count + 1,
-                last_seq => LastSeq
-            });
+            Changes0;
         _ ->
-            Acc#{
-                doc_acc := DocAcc ++ [Change],
-                count := Count + 1,
-                last_seq := LastSeq
-            }
+            [Change | Changes0]
     end,
-    {ok, Acc1}.
-
-
-map_docs(Mrst, Docs) ->
-    % Run all the non deleted docs through the view engine and
-    Mrst1 = start_query_server(Mrst),
-    QServer = Mrst1#mrst.qserver,
 
-    {Deleted0, NotDeleted0} = lists:partition(fun(Doc) ->
-        #{deleted := Deleted} = Doc,
-        Deleted
-    end, Docs),
-
-    Deleted1 = lists:map(fun(Doc) ->
-        Doc#{results => []}
-    end, Deleted0),
-
-    DocsToMap = lists:map(fun(Doc) ->
-        #{doc := DocRec} = Doc,
-        DocRec
-    end, NotDeleted0),
-
-    {ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap),
-
-    % The expanded function head here is making an assertion
-    % that the results match the given doc
-    NotDeleted1 = lists:zipwith(fun(#{id := DocId} = Doc, {DocId, Results}) ->
-        Doc#{results => Results}
-    end, NotDeleted0, AllResults),
-
-    % I'm being a bit careful here resorting the docs
-    % in order of the changes feed. Theoretically this is
-    % unnecessary since we're inside a single transaction.
-    % However, I'm concerned if we ever split this up
-    % into multiple transactions that this detail might
-    % be important but forgotten.
-    MappedDocs = lists:sort(fun(A, B) ->
-        #{sequence := ASeq} = A,
-        #{sequence := BSeq} = B,
-        ASeq =< BSeq
-    end, Deleted1 ++ NotDeleted1),
+    State1 = State0#{
+        count := Count + 1,
+        last_seq := LastSeq
+    },
 
-    {Mrst1, MappedDocs}.
+    case length(Changes1) < BatchSize of
+        true ->
+            {ok, {State1, MrSt, Changes1}};
+        false ->
+            State2 = spawn_worker({State1, MrSt, Changes1}),
+            {ok, {State2, MrSt, []}}
+    end.
 
 
-write_docs(TxDb, Mrst, Docs, State) ->
-    #mrst{
-        views = Views,
-        sig = Sig
-    } = Mrst,
+spawn_worker({State, _MrSt, []}) ->
+    State;
 
+spawn_worker({State, MrSt, Changes}) when length(Changes) > 0 ->
     #{
-        last_seq := LastSeq
+        tx_db := #{tx := Tx} = TxDb,
+        workers := Workers
     } = State,
+    ReadVersion = erlfdb:wait(erlfdb:get_read_version(Tx)),
+    WState = State#{
+        tx_db := TxDb#{tx := {read_version, ReadVersion}},
+        workers := []
+    },
+    Worker = spawn_monitor(fun() ->
+        process_changes(WState, MrSt, Changes)
+    end),
+    State#{
+        workers := [Worker | Workers]
+    }.
 
-    ViewIds = [View#mrview.id_num || View <- Views],
-
-    lists:foreach(fun(Doc) ->
-        couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
-    end, Docs),
 
-    couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+process_changes(State, MrSt, Changes0) ->
+    #{
+        tx_db := #{tx := {read_version, ReadVersion}} = TxDb0
+    } = State,
+    {ok, Db} = application:get_env(fabric, db),
+    exit(erlfdb:transactional(Db, fun(NewTx) ->
+        erlfdb:set_read_version(NewTx, ReadVersion),
+        TxDb1 = TxDb0#{tx := NewTx},
+        Changes1 = fetch_docs(TxDb1, Changes0),
+        Changes2 = map_docs(MrSt, Changes1),
+        {ok, Changes2}
+    end)).
 
 
 fetch_docs(Db, Changes) ->
-    {Deleted, NotDeleted} = lists:partition(fun(Doc) ->
-        #{deleted := Deleted} = Doc,
+    {Deleted, NotDeleted} = lists:partition(fun(Change) ->
+        #{deleted := Deleted} = Change,
         Deleted
     end, Changes),
 
@@ -269,7 +272,66 @@ fetch_docs(Db, Changes) ->
     Deleted ++ ChangesWithDocs.
 
 
-start_query_server(#mrst{qserver = nil} = Mrst) ->
+map_docs(MrSt, Changes) ->
+    % Run all the non deleted docs through the view engine and
+    {ok, QServer} = get_query_server(MrSt),
+
+    {Deleted0, NotDeleted0} = lists:partition(fun(Change) ->
+        #{deleted := Deleted} = Change,
+        Deleted
+    end, Changes),
+
+    Deleted1 = lists:map(fun(Change) ->
+        Change#{
+            results => []
+        }
+    end, Deleted0),
+
+    DocsToMap = lists:map(fun(Change) ->
+        #{doc := DocRec} = Change,
+        DocRec
+    end, NotDeleted0),
+
+    {ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap),
+
+    % The expanded function head here is making an assertion
+    % that the results match the given doc
+    NotDeleted1 = lists:zipwith(fun(#{id := Id} = Change, {Id, Results}) ->
+        Change#{
+            doc := [],
+            results => Results
+        }
+    end, NotDeleted0, AllResults),
+
+    % I'm being a bit careful here resorting the docs
+    % in order of the changes feed. Theoretically this is
+    % unnecessary since we're inside a single transaction.
+    % However, I'm concerned if we ever split this up
+    % into multiple transactions that this detail might
+    % be important but forgotten.
+    lists:sort(fun(A, B) ->
+        #{sequence := ASeq} = A,
+        #{sequence := BSeq} = B,
+        ASeq =< BSeq
+    end, Deleted1 ++ NotDeleted1).
+
+
+write_changes(TxDb, MrSt, LastSeq, Changes) ->
+    #mrst{
+        views = Views,
+        sig = Sig
+    } = MrSt,
+
+    ViewIds = [View#mrview.id_num || View <- Views],
+
+    lists:foreach(fun(Change) ->
+        couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Change)
+    end, Changes),
+
+    couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+
+
+get_query_server(#mrst{qserver = nil} = Mrst) ->
     #mrst{
         db_name = DbName,
         idx_name = DDocId,
@@ -278,18 +340,14 @@ start_query_server(#mrst{qserver = nil} = Mrst) ->
         lib = Lib,
         views = Views
     } = Mrst,
-    {ok, QServer} = couch_eval:acquire_map_context(
+    couch_eval:acquire_map_context(
             DbName,
             DDocId,
             Language,
             Sig,
             Lib,
             [View#mrview.def || View <- Views]
-        ),
-    Mrst#mrst{qserver = QServer};
-
-start_query_server(#mrst{} = Mrst) ->
-    Mrst.
+        ).
 
 
 report_progress(State, UpdateType) ->
@@ -336,4 +394,9 @@ report_progress(State, UpdateType) ->
 
 
 num_changes() ->
-    config:get_integer("couch_views", "change_limit", 100).
+    config:get_integer("couch_views", "change_limit", 1000).
+
+
+batch_size() ->
+    config:get_integer("couch_views", "batch_size", 100).
+