You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/07/23 21:54:52 UTC

[couchdb] 07/31: Move jobs logic to couch_view_jobs

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/views
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b16b5b25304281aa9529d663c860327ff4e168f2
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Jul 17 10:23:59 2019 -0500

    Move jobs logic to couch_view_jobs
    
    Now that the couch_jobs API is full baked we can remove the thin wrapper
    API in couch_views_jobs and just use couch_jobs directly.
---
 src/couch_views/src/couch_views.erl      |  80 ++++---------------
 src/couch_views/src/couch_views_jobs.erl | 131 +++++++++++++------------------
 2 files changed, 68 insertions(+), 143 deletions(-)

diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index c059204..65af1bf 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -44,86 +44,36 @@ query(Db, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
     end,
 
     Args = mrargs_to_map(QueryArgs2),
-
-    maybe_build_view(Db, MrSt, Args),
+    ok = maybe_update_view(Db, Mrst, Args),
 
     try
         couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args)
     after
         UpdateAfter = maps:get(update, Args) == lazy,
         if UpdateAfter == false -> ok; true ->
-            maybe_add_couch_job(Db, Mrst)
+            couch_views_jobs:build_view_async(Db, Mrst)
         end
     end.
 
 
-maybe_build_index(_Db, _Mrst, #{update := false}) ->
-    false;
+maybe_update_view(_Db, _Mrst, #{update := false}) ->
+    ok;
 
-maybe_build_index(_Db, _Mrst, #{update := lazy}) ->
-    false;
+maybe_update_view(_Db, _Mrst, #{update := lazy}) ->
+    ok;
 
-maybe_build_index(Db, Mrst, _Args) ->
-    {Status, Seq} = fabric2_fdb:transactional(Db, fun(TxDb) ->
-        case view_up_to_date(TxDb, Mrst) of
-            {true, UpdateSeq} ->
-                {ready, UpdateSeq};
-            {false, LatestSeq} ->
-                maybe_add_couch_job(TxDb, Mrst),
-                {false, LatestSeq}
+maybe_update_view(Db, Mrst, _Args) ->
+    WaitSeq = fabric2_fdb:transactional(Db, fun(TxDb) ->
+        DbSeq = fabric2_db:get_update_seq(TxDb),
+        ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
+        case DbSeq == ViewSeq of
+            true -> ready;
+            false -> DbSeq
         end
     end),
 
-    if Status == ready -> true; true ->
-        subscribe_and_wait_for_index(Db, Mrst, Seq)
-    end.
-
-
-view_up_to_date(Db, Mrst) ->
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
-        UpdateSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
-        LastChange = fabric2_fdb:get_last_change(TxDb),
-        {UpdateSeq == LastChange, LastChange}
-    end).
-
-
-maybe_add_couch_job(TxDb, Mrst) ->
-    case couch_views_jobs:status(TxDb, Mrst) of
-        running ->
-            ok;
-        pending ->
-            ok;
-        Status when Status == finished orelse Status == not_found ->
-            couch_views_jobs:add(TxDb, Mrst)
-    end.
-
-
-subscribe_and_wait_for_index(Db, Mrst, Seq) ->
-    case couch_views_jobs:subscribe(Db, Mrst) of
-        {error, Error} ->
-            throw({error, Error});
-        {ok, finished, _} ->
-            ready;
-        {ok, Subscription, _JobState, _} ->
-            wait_for_index_ready(Subscription, Db, Mrst, Seq)
-    end.
-
-
-wait_for_index_ready(Subscription, Db, Mrst, Seq) ->
-    Out = couch_views_jobs:wait(Subscription),
-    case Out of
-        {finished, _JobData} ->
-            ready;
-        {pending, _JobData} ->
-            wait_for_index_ready(Subscription, Db, Mrst, Seq);
-        {running, #{last_seq := LastSeq}} ->
-            if LastSeq =< Seq -> ready; true ->
-                wait_for_index_ready(Subscription, Db, Mrst, Seq)
-            end;
-        {running, _JobData} ->
-            wait_for_index_ready(Subscription, Db, Mrst, Seq);
-        {error, Error} ->
-            throw({error, Error})
+    if WaitSeq == ready -> ok; true ->
+        couch_views_jobs:build_view(Db, Mrst, DbSeq)
     end.
 
 
diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl
index 31ab728..d9c5157 100644
--- a/src/couch_views/src/couch_views_jobs.erl
+++ b/src/couch_views/src/couch_views_jobs.erl
@@ -13,110 +13,85 @@
 -module(couch_views_jobs).
 
 -export([
-    status/2,
-    add/2,
-
-    accept/0,
-    get_job_data/1,
-    update/5,
-    finish/5,
     set_timeout/0,
-
-    subscribe/2,
-    wait/1,
-    unsubscribe/1,
-
-    create_job_id/2
+    build_view/3,
+    build_view_async/2
 ]).
 
 
 -include("couch_views.hrl").
 
 
-% Query request usage of jobs
-
-
-status(TxDb, Mrst) ->
-    JobId = create_job_id(TxDb, Mrst),
-
-    case couch_jobs:get_job_state(TxDb, ?INDEX_JOB_TYPE, JobId) of
-        {ok, State} -> State;
-        {error, not_found} -> not_found;
-        Error -> Error
-    end.
-
-
-add(TxDb, Mrst) ->
-    JobData = create_job_data(TxDb, Mrst, 0),
-
-    JobId = create_job_id(TxDb, Mrst),
-    JTx = couch_jobs_fdb:get_jtx(TxDb),
-    couch_jobs:add(JTx, ?INDEX_JOB_TYPE, JobId, JobData).
-
-
-% couch_views_worker api
-
-
-accept() ->
-    couch_jobs:accept(?INDEX_JOB_TYPE).
-
-
-get_job_data(JobId) ->
-    couch_jobs:get_job_data(undefined, ?INDEX_JOB_TYPE, JobId).
-
-
-update(JTx, Job, Db, Mrst, LastSeq) ->
-    JobData = create_job_data(Db, Mrst, LastSeq),
-    couch_jobs:update(JTx, Job, JobData).
-
-
-finish(JTx, Job, Db, Mrst, LastSeq) ->
-    JobData = create_job_data(Db, Mrst, LastSeq),
-    couch_jobs:finish(JTx, Job, JobData).
-
-
 set_timeout() ->
     couch_jobs:set_type_timeout(?INDEX_JOB_TYPE, 6 * 1000).
 
 
-% Watcher Job api
-
-
-subscribe(Db, Mrst) ->
-    JobId = create_job_id(Db, Mrst),
-    couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId).
-
-
-wait(JobSubscription) ->
-    case couch_jobs:wait(JobSubscription, infinity) of
-        {?INDEX_JOB_TYPE, _JobId, JobState, JobData} -> {JobState, JobData};
-        {timeout} -> {error, timeout}
+build_view(Db, Mrst, UpdateSeq) ->
+    {ok, JobId} = build_view_async(Db, Mrst),
+    case wait_for_job(JobId, UpdateSeq) of
+        ok -> ok;
+        retry -> build_view(Db, Mrst, UpdateSeq)
     end.
 
 
-unsubscribe(JobSubscription) ->
-    couch_jobs:unsubscribe(JobSubscription).
+build_view_async(Db, Mrst) ->
+    JobId = create_job_id(TxDb, Mrst),
+    JobData = create_job_data(TxDb, Mrst),
+    ok = couch_jobs:add(undefined, ?INDEX_JOB_TYPE, JobId, JobData).
+    {ok, JobId}.
+
+
+
+wait_for_job(JobId, UpdateSeq) ->
+    case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of
+        {ok, Subscription, _State, _Data} ->
+            wait_for_job(JobId, Subscription, UpdateSeq)
+        {ok, finished, Data} ->
+            case Data of
+                #{view_seq := ViewSeq} when ViewSeq >= UpdateSeq ->
+                    ok;
+                _ ->
+                    retry
+            end
+    end.
 
 
-% Internal
+wait_for_job(JobId, Subscription, UpdateSeq) ->
+    case wait(Subscription, infinity) of
+        {error, Error} ->
+            erlang:error(Error);
+        {finished, #{view_seq := ViewSeq}} when ViewSeq >= UpdateSeq ->
+            ok;
+        {finished, _} ->
+            wait_for_job(JobId, UpdateSeq);
+        {_State, #{view_seq := ViewSeq}} when ViewSeq >= UpdateSeq ->
+            couch_jobs:unsubscribe(Subscription),
+            ok;
+        {_, _} ->
+            wait_for_job(JobId, Subscription, UpdateSeq)
+    end.
 
 
-create_job_id(#{name := DbName}, #mrst{sig = Sig}) ->
+get_id(#{name := DbName}, #mrst{sig = Sig}) ->
     create_job_id(DbName, Sig);
 
-create_job_id(DbName, Sig) ->
+get_id(DbName, Sig) ->
     <<DbName/binary, Sig/binary>>.
 
 
-create_job_data(Db, Mrst, LastSeq) ->
-    #{name := DbName} = Db,
-
+create_job_data(Db, Mrst) ->
     #mrst{
         idx_name = DDocId
     } = Mrst,
 
     #{
-        db_name => DbName,
-        ddoc_id => DDocId,
-        last_seq => LastSeq
+        db_name => fabric2_db:name(Db),
+        ddoc_id => DDocId
     }.
+
+
+wait(Subscription) ->
+    case couch_jobs:wait(JobSubscription, infinity) of
+        {?INDEX_JOB_TYPE, _JobId, JobState, JobData} -> {JobState, JobData};
+        timeout -> {error, timeout}
+    end.