You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/07/23 21:54:52 UTC
[couchdb] 07/31: Move jobs logic to couch_view_jobs
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch prototype/views
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit b16b5b25304281aa9529d663c860327ff4e168f2
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Jul 17 10:23:59 2019 -0500
Move jobs logic to couch_view_jobs
Now that the couch_jobs API is full baked we can remove the thin wrapper
API in couch_views_jobs and just use couch_jobs directly.
---
src/couch_views/src/couch_views.erl | 80 ++++---------------
src/couch_views/src/couch_views_jobs.erl | 131 +++++++++++++------------------
2 files changed, 68 insertions(+), 143 deletions(-)
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index c059204..65af1bf 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -44,86 +44,36 @@ query(Db, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
end,
Args = mrargs_to_map(QueryArgs2),
-
- maybe_build_view(Db, MrSt, Args),
+ ok = maybe_update_view(Db, Mrst, Args),
try
couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args)
after
UpdateAfter = maps:get(update, Args) == lazy,
if UpdateAfter == false -> ok; true ->
- maybe_add_couch_job(Db, Mrst)
+ couch_views_jobs:build_view_async(Db, Mrst)
end
end.
-maybe_build_index(_Db, _Mrst, #{update := false}) ->
- false;
+maybe_update_view(_Db, _Mrst, #{update := false}) ->
+ ok;
-maybe_build_index(_Db, _Mrst, #{update := lazy}) ->
- false;
+maybe_update_view(_Db, _Mrst, #{update := lazy}) ->
+ ok;
-maybe_build_index(Db, Mrst, _Args) ->
- {Status, Seq} = fabric2_fdb:transactional(Db, fun(TxDb) ->
- case view_up_to_date(TxDb, Mrst) of
- {true, UpdateSeq} ->
- {ready, UpdateSeq};
- {false, LatestSeq} ->
- maybe_add_couch_job(TxDb, Mrst),
- {false, LatestSeq}
+maybe_update_view(Db, Mrst, _Args) ->
+ WaitSeq = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ DbSeq = fabric2_db:get_update_seq(TxDb),
+ ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
+ case DbSeq == ViewSeq of
+ true -> ready;
+ false -> DbSeq
end
end),
- if Status == ready -> true; true ->
- subscribe_and_wait_for_index(Db, Mrst, Seq)
- end.
-
-
-view_up_to_date(Db, Mrst) ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- UpdateSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
- LastChange = fabric2_fdb:get_last_change(TxDb),
- {UpdateSeq == LastChange, LastChange}
- end).
-
-
-maybe_add_couch_job(TxDb, Mrst) ->
- case couch_views_jobs:status(TxDb, Mrst) of
- running ->
- ok;
- pending ->
- ok;
- Status when Status == finished orelse Status == not_found ->
- couch_views_jobs:add(TxDb, Mrst)
- end.
-
-
-subscribe_and_wait_for_index(Db, Mrst, Seq) ->
- case couch_views_jobs:subscribe(Db, Mrst) of
- {error, Error} ->
- throw({error, Error});
- {ok, finished, _} ->
- ready;
- {ok, Subscription, _JobState, _} ->
- wait_for_index_ready(Subscription, Db, Mrst, Seq)
- end.
-
-
-wait_for_index_ready(Subscription, Db, Mrst, Seq) ->
- Out = couch_views_jobs:wait(Subscription),
- case Out of
- {finished, _JobData} ->
- ready;
- {pending, _JobData} ->
- wait_for_index_ready(Subscription, Db, Mrst, Seq);
- {running, #{last_seq := LastSeq}} ->
- if LastSeq =< Seq -> ready; true ->
- wait_for_index_ready(Subscription, Db, Mrst, Seq)
- end;
- {running, _JobData} ->
- wait_for_index_ready(Subscription, Db, Mrst, Seq);
- {error, Error} ->
- throw({error, Error})
+ if WaitSeq == ready -> ok; true ->
+ couch_views_jobs:build_view(Db, Mrst, DbSeq)
end.
diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl
index 31ab728..d9c5157 100644
--- a/src/couch_views/src/couch_views_jobs.erl
+++ b/src/couch_views/src/couch_views_jobs.erl
@@ -13,110 +13,85 @@
-module(couch_views_jobs).
-export([
- status/2,
- add/2,
-
- accept/0,
- get_job_data/1,
- update/5,
- finish/5,
set_timeout/0,
-
- subscribe/2,
- wait/1,
- unsubscribe/1,
-
- create_job_id/2
+ build_view/3,
+ build_view_async/2
]).
-include("couch_views.hrl").
-% Query request usage of jobs
-
-
-status(TxDb, Mrst) ->
- JobId = create_job_id(TxDb, Mrst),
-
- case couch_jobs:get_job_state(TxDb, ?INDEX_JOB_TYPE, JobId) of
- {ok, State} -> State;
- {error, not_found} -> not_found;
- Error -> Error
- end.
-
-
-add(TxDb, Mrst) ->
- JobData = create_job_data(TxDb, Mrst, 0),
-
- JobId = create_job_id(TxDb, Mrst),
- JTx = couch_jobs_fdb:get_jtx(TxDb),
- couch_jobs:add(JTx, ?INDEX_JOB_TYPE, JobId, JobData).
-
-
-% couch_views_worker api
-
-
-accept() ->
- couch_jobs:accept(?INDEX_JOB_TYPE).
-
-
-get_job_data(JobId) ->
- couch_jobs:get_job_data(undefined, ?INDEX_JOB_TYPE, JobId).
-
-
-update(JTx, Job, Db, Mrst, LastSeq) ->
- JobData = create_job_data(Db, Mrst, LastSeq),
- couch_jobs:update(JTx, Job, JobData).
-
-
-finish(JTx, Job, Db, Mrst, LastSeq) ->
- JobData = create_job_data(Db, Mrst, LastSeq),
- couch_jobs:finish(JTx, Job, JobData).
-
-
set_timeout() ->
couch_jobs:set_type_timeout(?INDEX_JOB_TYPE, 6 * 1000).
-% Watcher Job api
-
-
-subscribe(Db, Mrst) ->
- JobId = create_job_id(Db, Mrst),
- couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId).
-
-
-wait(JobSubscription) ->
- case couch_jobs:wait(JobSubscription, infinity) of
- {?INDEX_JOB_TYPE, _JobId, JobState, JobData} -> {JobState, JobData};
- {timeout} -> {error, timeout}
+build_view(Db, Mrst, UpdateSeq) ->
+ {ok, JobId} = build_view_async(Db, Mrst),
+ case wait_for_job(JobId, UpdateSeq) of
+ ok -> ok;
+ retry -> build_view(Db, Mrst, UpdateSeq)
end.
-unsubscribe(JobSubscription) ->
- couch_jobs:unsubscribe(JobSubscription).
+build_view_async(Db, Mrst) ->
+ JobId = create_job_id(TxDb, Mrst),
+ JobData = create_job_data(TxDb, Mrst),
+ ok = couch_jobs:add(undefined, ?INDEX_JOB_TYPE, JobId, JobData).
+ {ok, JobId}.
+
+
+
+wait_for_job(JobId, UpdateSeq) ->
+ case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of
+ {ok, Subscription, _State, _Data} ->
+ wait_for_job(JobId, Subscription, UpdateSeq)
+ {ok, finished, Data} ->
+ case Data of
+ #{view_seq := ViewSeq} when ViewSeq >= UpdateSeq ->
+ ok;
+ _ ->
+ retry
+ end
+ end.
-% Internal
+wait_for_job(JobId, Subscription, UpdateSeq) ->
+ case wait(Subscription, infinity) of
+ {error, Error} ->
+ erlang:error(Error);
+ {finished, #{view_seq := ViewSeq}} when ViewSeq >= UpdateSeq ->
+ ok;
+ {finished, _} ->
+ wait_for_job(JobId, UpdateSeq);
+ {_State, #{view_seq := ViewSeq}} when ViewSeq >= UpdateSeq ->
+ couch_jobs:unsubscribe(Subscription),
+ ok;
+ {_, _} ->
+ wait_for_job(JobId, Subscription, UpdateSeq)
+ end.
-create_job_id(#{name := DbName}, #mrst{sig = Sig}) ->
+get_id(#{name := DbName}, #mrst{sig = Sig}) ->
create_job_id(DbName, Sig);
-create_job_id(DbName, Sig) ->
+get_id(DbName, Sig) ->
<<DbName/binary, Sig/binary>>.
-create_job_data(Db, Mrst, LastSeq) ->
- #{name := DbName} = Db,
-
+create_job_data(Db, Mrst) ->
#mrst{
idx_name = DDocId
} = Mrst,
#{
- db_name => DbName,
- ddoc_id => DDocId,
- last_seq => LastSeq
+ db_name => fabric2_db:name(Db),
+ ddoc_id => DDocId
}.
+
+
+wait(Subscription) ->
+ case couch_jobs:wait(JobSubscription, infinity) of
+ {?INDEX_JOB_TYPE, _JobId, JobState, JobData} -> {JobState, JobData};
+ timeout -> {error, timeout}
+ end.