You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/07/23 21:54:55 UTC
[couchdb] 10/31: Simplify worker vs. indexer distinction
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch prototype/views
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 14321badcfae21f428e63024b546f671f07b9355
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Jul 17 13:26:31 2019 -0500
Simplify worker vs. indexer distinction
This just turns an indexer into a job handler and removes the worker
concept entirely. couch_views_server just starts `max_workers` indexer
process that each wait for a job to process. Once processing is finished
the worker exits and couch_views_server spawns a new indexer to replace
it.
---
src/couch_views/src/couch_views_indexer.erl | 270 ++++++++++++++--------------
src/couch_views/src/couch_views_worker.erl | 44 -----
2 files changed, 132 insertions(+), 182 deletions(-)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index e9f0b41..1a84116 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -13,55 +13,106 @@
-module(couch_views_indexer).
-export([
- update/2,
- update/4,
-
- % For tests
- map_docs/2,
- write_doc/4
+ spawn_link/0
]).
--include("couch_views.hrl").
+-export([
+ init/0
+]).
+
+-include_lib("couch_views/include/couch_views.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("fabric/src/fabric2.hrl").
-include_lib("eunit/include/eunit.hrl").
-% TODO:
+% TODO:
% * Handle timeouts of transaction and other errors
-update(Db, Mrst) ->
- Noop = fun (_) -> ok end,
- update(Db, Mrst, Noop, []).
-
-
-update(#{} = Db, Mrst, ProgressCallback, ProgressArgs)
- when is_function(ProgressCallback, 6) ->
- try
- Seq = couch_views_fdb:get_update_seq(Db, Mrst),
- State = #{
- since_seq => Seq,
- count => 0,
- limit => config:get_integer("couch_views", "change_limit", 100),
- doc_acc => [],
- last_seq => Seq,
- callback => ProgressCallback,
- callback_args => ProgressArgs,
- mrst => Mrst
- },
- update_int(Db, State)
- catch error:database_does_not_exist ->
- #{db_prefix := DbPrefix} = Db,
- couch_log:notice("couch_views_indexer stopped"
- "- ~p database does not exist", [DbPrefix])
- end.
+
+spawn_link() ->
+ proc_lib:spawn_link(?MODULE, init, []).
+
+
+init() ->
+ {ok, Job, Data} = couch_jobs:accept(?INDEX_JOB_TYPE, #{}),
+
+ #{
+ <<"db_name">> := DbName,
+ <<"ddoc_id">> := DDocId,
+ <<"sig">> := Sig
+ } = Data,
+
+ {ok, Db} = fabric2_db:open(DbName, []),
+ {ok, DDoc} = fabric2_db:open_doc(Db, DDocId),
+ {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+
+ if Mrst#mrst.sig == Sig -> ok; true ->
+ couch_jobs:finish(Job, Data#{error => mismatched_signature}),
+ exit(normal)
+ end,
+
+ State = #{
+ tx_db => undefined,
+ db_seq => undefined,
+ view_seq => undefined,
+ last_seq => undefined,
+ count => 0,
+ limit => num_changes(),
+ doc_acc => [],
+ design_opts => Mrst#mrst.design_opts
+ },
+
+ update(Db, Mrst, State).
+
+
+update(#{} = Db, Mrst0, State0) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ % In the first iteration of update we need
+ % to populate our db and view sequences
+ State1 = case State0 of
+ #{db_seq := undefined} ->
+ State0#{
+ tx_db := TxDb,
+ db_seq := fabric2_db:get_update_seq(TxDb),
+ view_seq := couch_views_fdb:get_update_seq(TxDb, Mrst)
+ };
+ _ ->
+ State0#{
+ tx_db := TxDb
+ }
+ end,
+
+ {ok, State2} = fold_changes(State1),
+
+ #{
+ count := Count,
+ limit := Limit,
+ doc_acc := DocAcc,
+ last_seq := LastSeq
+ } = State2,
+
+ {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc),
+ write_docs(Db, Mrst1, MappedResults, State2),
+
+ case Count < Limit of
+ true ->
+ report_progress(State2, finished);
+ false ->
+ report_progress(State2, update),
+ State3 = maps:merge(FinalState, #{
+ count => 0,
+ doc_acc => [],
+ db_seq => LastSeq,
+ last_seq => 0,
+ mrst => Mrst1
+ }),
+
+ end).
update_int(#{} = Db, State) ->
- {ok, FinalState} = fabric2_fdb:transactional(Db, fun(TxDb) ->
- State1 = maps:put(tx_db, TxDb, State),
- fold_changes(State1)
- end),
+
#{
count := Count,
@@ -73,8 +124,8 @@ update_int(#{} = Db, State) ->
mrst := Mrst
} = FinalState,
- {MappedResults, Mrst1} = map_docs(Mrst, DocAcc),
- write_docs(Db, Mrst1, MappedResults, FinalState),
+ {MappedDocs, Mrst1} = map_docs(Mrst, DocAcc),
+ write_docs(Db, Mrst1, MappedDocs, FinalState),
case Count < Limit of
true ->
@@ -94,13 +145,13 @@ update_int(#{} = Db, State) ->
fold_changes(State) ->
#{
- since_seq := SinceSeq,
+ view_seq := SinceSeq,
limit := Limit,
tx_db := TxDb
} = State,
- fabric2_db:fold_changes(TxDb, SinceSeq,
- fun process_changes/2, State, [{limit, Limit}]).
+ Fun = fun process_changes/2,
+ fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
process_changes(Change, Acc) ->
@@ -108,7 +159,7 @@ process_changes(Change, Acc) ->
doc_acc := DocAcc,
count := Count,
tx_db := TxDb,
- mrst := Mrst
+ design_opts := DesignOpts
} = Acc,
#{
@@ -117,8 +168,7 @@ process_changes(Change, Acc) ->
deleted := Deleted
} = Change,
- IncludeDesign = lists:keymember(<<"include_design">>, 1,
- Mrst#mrst.design_opts),
+ IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts),
Acc1 = case {Id, IncludeDesign} of
{<<"_design/", _/binary>>, false} ->
@@ -126,16 +176,13 @@ process_changes(Change, Acc) ->
maps:merge(Acc, #{
count => Count + 1,
last_seq => LastSeq
- });
+ });
_ ->
-
% Making a note here that we should make fetching all the docs
% a parallel fdb operation
- Doc = if Deleted -> []; true ->
- case fabric2_db:open_doc(TxDb, Id) of
- {ok, Doc0} -> Doc0;
- {not_found, _} -> []
- end
+ {ok, Doc} = case Deleted of
+ true -> {ok, []};
+ false -> fabric2_db:open_doc(TxDb, Id)
end,
Change1 = maps:put(doc, Doc, Change),
@@ -150,113 +197,60 @@ process_changes(Change, Acc) ->
map_docs(Mrst, Docs) ->
% Run all the non deleted docs through the view engine and
- Mrst1 = get_query_server(Mrst),
+ Mrst1 = start_query_server(Mrst),
QServer = Mrst1#mrst.qserver,
-
MapFun = fun
(#{deleted := true} = Change) ->
maps:put(results, [], Change);
-
(Change) ->
#{doc := Doc} = Change,
couch_stats:increment_counter([couchdb, mrview, map_doc]),
{ok, RawResults} = couch_query_servers:map_doc_raw(QServer, Doc),
JsonResults = couch_query_servers:raw_to_ejson(RawResults),
- ListResults = [[list_to_tuple(Res) || Res <- FunRs]
- || FunRs <- JsonResults],
+ ListResults = lists:map(fun(ViewResults) ->
+ [list_to_tuple(Res) || Res <- ViewResults]
+ end, JsonResults),
maps:put(results, ListResults, Change)
end,
- MappedResults = lists:map(MapFun, Docs),
- {MappedResults, Mrst1}.
+ {Mrst1, lists:map(MapFun, Docs)}.
-start_query_server(#mrst{} = Mrst) ->
+write_docs(TxDb, Mrst, Docs, State) ->
#mrst{
- language=Language,
- lib=Lib,
- views=Views
+ views = Views,
+ sig = Sig
} = Mrst,
- Defs = [View#mrview.def || View <- Views],
- {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
- Mrst#mrst{qserver=QServer}.
+ #{
+ last_seq := LastSeq
+ } = State,
-get_query_server(#mrst{} = Mrst) ->
- case Mrst#mrst.qserver of
- nil -> start_query_server(Mrst);
- _ -> Mrst
- end.
+ ViewIds = [View#mrview.id_num || View <- Views],
+
+ lists:foreach(fun(Doc) ->
+ couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
+ end, Docs),
+ couch_views_fdb:update_view_seq(TxDb, Sig, LastSeq).
-write_docs(Db, Mrst, Docs, State) ->
+
+start_query_server(#mrst{} = Mrst) ->
#mrst{
- views = Views,
- sig = Sig
+ language = Language,
+ lib = Lib,
+ views = Views
} = Mrst,
+ Defs = [View#mrview.def || View <- Views],
+ {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
+ Mrst#mrst{qserver = QServer}.
- #{
- callback := Cb,
- callback_args := CallbackArgs
- } = State,
- IdxNames = lists:map(fun (View) ->
- View#mrview.id_num
- end, Views),
-
- lists:foreach(fun (Doc) ->
- #{sequence := Seq} = Doc,
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- couch_views_fdb:update_view_seq(TxDb, Sig, Seq),
- Cb(TxDb, update, CallbackArgs, Db, Mrst, Seq),
- write_doc(TxDb, Sig, Doc, IdxNames)
- end)
- end, Docs).
-
-
-write_doc(TxDb, Sig, #{deleted := true} = Doc, ViewIds) ->
- #{id := DocId} = Doc,
- lists:foreach(fun (IdxName) ->
- maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName)
- end, ViewIds);
-
-write_doc(TxDb, Sig, Doc, ViewIds) ->
- #{id := DocId, results := Results} = Doc,
- lists:foreach(fun
- ({IdxName, []}) ->
- maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName);
- ({IdxName, IdxResults}) ->
- lists:foldl(fun (IdxResult, DocIdsCleared) ->
- {IdxKey, _} = IdxResult,
- OldIdxKey = couch_views_fdb:get_id_index(TxDb, Sig,
- DocId, IdxName),
- IsAlreadyCleared = lists:member(DocId, DocIdsCleared),
- case OldIdxKey == not_found orelse IsAlreadyCleared == true of
- true ->
- couch_views_fdb:set_id_index(TxDb, Sig, IdxName,
- DocId, IdxKey),
- couch_views_fdb:set_map_index_results(TxDb, Sig,
- IdxName, DocId, IdxResults);
- false ->
- couch_views_fdb:clear_id_index(TxDb, Sig,
- DocId, IdxName),
- couch_views_fdb:clear_map_index(TxDb, Sig, IdxName,
- DocId, OldIdxKey),
- couch_views_fdb:set_id_index(TxDb, Sig, DocId,
- IdxName, IdxKey),
- couch_views_fdb:set_map_index_results(TxDb, Sig,
- IdxName, DocId, IdxResults)
- end,
- [DocId | DocIdsCleared]
- end, [], IdxResults)
- end, lists:zip(ViewIds, Results)).
-
-
-maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName) ->
- OldIdxKey = couch_views_fdb:get_id_index(TxDb, Sig,
- DocId, IdxName),
- if OldIdxKey == not_found -> ok; true ->
- couch_views_fdb:clear_id_index(TxDb, Sig,
- DocId, IdxName),
- couch_views_fdb:clear_map_index(TxDb, Sig, IdxName,
- DocId, OldIdxKey)
+
+report_progress(State, UpdateType) ->
+ case UpdateType of
+ update ->
+ couch_views_jobs:update(Tx, Job, Db, Mrst, LastSeq);
+ finished ->
+ couch_views_jobs:finish(Tx, Job, Db, Mrst, LastSeq)
end.
+
diff --git a/src/couch_views/src/couch_views_worker.erl b/src/couch_views/src/couch_views_worker.erl
deleted file mode 100644
index fa641d5..0000000
--- a/src/couch_views/src/couch_views_worker.erl
+++ /dev/null
@@ -1,44 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_views_worker).
-
--export([
- start/2,
- job_progress/6
-]).
-
-
-start(Job, JobData) ->
- {ok, Db, Mrst} = get_indexing_info(JobData),
- % maybe we should spawn here
- couch_views_indexer:update(Db, Mrst, fun job_progress/6, Job).
-
-
-job_progress(Tx, Progress, Job, Db, Mrst, LastSeq) ->
- case Progress of
- update ->
- couch_views_jobs:update(Tx, Job, Db, Mrst, LastSeq);
- finished ->
- couch_views_jobs:finish(Tx, Job, Db, Mrst, LastSeq)
- end.
-
-
-get_indexing_info(JobData) ->
- #{
- <<"db_name">> := DbName,
- <<"ddoc_id">> := DDocId
- } = JobData,
- {ok, Db} = fabric2_db:open(DbName, []),
- {ok, DDoc} = fabric2_db:open_doc(Db, DDocId),
- {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
- {ok, Db, Mrst}.