You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by to...@apache.org on 2020/07/24 16:55:12 UTC

[couchdb] 03/03: add active_tasks for view builds using version stamps

This is an automated email from the ASF dual-hosted git repository.

tonysun83 pushed a commit to branch add_active_tasks_fdb2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a447f074dbba1417eb902f181f8f15e0da2da856
Author: Tony Sun <to...@gmail.com>
AuthorDate: Fri Jul 24 09:54:41 2020 -0700

    add active_tasks for view builds using version stamps
    
    Active Tasks requires TotalChanges and ChangesDone to show the progress
    of long running tasks. This requires count_changes_since to be
    implemented. Unfortunately, that is not easily done via with
    foundationdb. This commit replaces TotalChanges with the
    versionstamp + the number of docs as a progress indicator. This can
    possibly break existing api that relys on TotalChanges. ChangesDone
    will still exist, but instead of relying on the current changes seq
    it is simply a reflection of how many documents were written by the
    updater process.
---
 src/couch_views/src/couch_views_indexer.erl        |  33 ++++-
 src/couch_views/src/couch_views_util.erl           |  27 +++-
 .../test/couch_views_active_tasks_test.erl         | 155 +++++++++++++++++++++
 3 files changed, 208 insertions(+), 7 deletions(-)

diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 31868d9..9183d98 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -90,6 +90,7 @@ init() ->
         job => Job,
         job_data => Data,
         count => 0,
+        changes_done => 0,
         limiter => Limiter,
         doc_acc => [],
         design_opts => Mrst#mrst.design_opts
@@ -132,7 +133,9 @@ upgrade_data(Data) ->
             true -> Acc;
             false -> maps:put(Key, Default, Acc)
         end
-    end, Data, Defaults).
+    end, Data, Defaults),
+    % initialize active task
+    fabric2_active_tasks:update_active_task_info(Data, #{}).
 
 
 % Transaction limit exceeded don't retry
@@ -191,7 +194,8 @@ do_update(Db, Mrst0, State0) ->
             last_seq := LastSeq,
             limit := Limit,
             limiter := Limiter,
-            view_vs := ViewVS
+            view_vs := ViewVS,
+            changes_done := ChangesDone0
         } = State2,
         DocAcc1 = fetch_docs(TxDb, DocAcc),
         couch_rate:in(Limiter, Count),
@@ -199,13 +203,16 @@ do_update(Db, Mrst0, State0) ->
         {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
         WrittenDocs = write_docs(TxDb, Mrst1, MappedDocs, State2),
 
+        ChangesDone = ChangesDone0 + WrittenDocs,
+
         couch_rate:success(Limiter, WrittenDocs),
 
         case Count < Limit of
             true ->
                 maybe_set_build_status(TxDb, Mrst1, ViewVS,
                     ?INDEX_READY),
-                report_progress(State2, finished),
+                report_progress(State2#{changes_done := ChangesDone},
+                    finished),
                 {Mrst1, finished};
             false ->
                 State3 = report_progress(State2, update),
@@ -213,6 +220,7 @@ do_update(Db, Mrst0, State0) ->
                     tx_db := undefined,
                     count := 0,
                     doc_acc := [],
+                    changes_done := ChangesDone,
                     view_seq := LastSeq
                 }}
         end
@@ -483,7 +491,9 @@ report_progress(State, UpdateType) ->
         tx_db := TxDb,
         job := Job1,
         job_data := JobData,
-        last_seq := LastSeq
+        last_seq := LastSeq,
+        db_seq := DBSeq,
+        changes_done := ChangesDone
     } = State,
 
     #{
@@ -494,9 +504,18 @@ report_progress(State, UpdateType) ->
         <<"retries">> := Retries
     } = JobData,
 
+    ActiveTasks = fabric2_active_tasks:get_active_task_info(JobData),
+    TotalDone = case maps:get(<<"changes_done">>, ActiveTasks, 0) of
+        0 -> ChangesDone;
+        N -> N + ChangesDone
+    end,
+
+    NewActiveTasks = couch_views_util:active_tasks_info(TotalDone,
+        DbName, DDocId, LastSeq, DBSeq),
+
     % Reconstruct from scratch to remove any
     % possible existing error state.
-    NewData = #{
+    NewData0 = #{
         <<"db_name">> => DbName,
         <<"db_uuid">> => DbUUID,
         <<"ddoc_id">> => DDocId,
@@ -504,6 +523,8 @@ report_progress(State, UpdateType) ->
         <<"view_seq">> => LastSeq,
         <<"retries">> => Retries
     },
+    NewData = fabric2_active_tasks:update_active_task_info(NewData0,
+        NewActiveTasks),
 
     case UpdateType of
         update ->
@@ -540,4 +561,4 @@ key_size_limit() ->
 
 
 value_size_limit() ->
-    config:get_integer("couch_views", "value_size_limit", ?VALUE_SIZE_LIMIT).
+    config:get_integer("couch_views", "value_size_limit", ?VALUE_SIZE_LIMIT).
\ No newline at end of file
diff --git a/src/couch_views/src/couch_views_util.erl b/src/couch_views/src/couch_views_util.erl
index 154e9e2..11bba75 100644
--- a/src/couch_views/src/couch_views_util.erl
+++ b/src/couch_views/src/couch_views_util.erl
@@ -17,7 +17,8 @@
     ddoc_to_mrst/2,
     validate_args/1,
     validate_args/2,
-    is_paginated/1
+    is_paginated/1,
+    active_tasks_info/5
 ]).
 
 
@@ -276,3 +277,27 @@ is_paginated(#mrargs{page_size = PageSize}) when is_integer(PageSize) ->
 
 is_paginated(_) ->
     false.
+
+
+active_tasks_info(ChangesDone, DbName, DDocId, LastSeq, DBSeq) ->
+    #{
+        <<"type">> => <<"indexer">>,
+        <<"database">> => DbName,
+        <<"changes_done">> => ChangesDone,
+        <<"design_document">> => DDocId,
+        <<"current_version_stamp">> => convert_seq_to_stamp(LastSeq),
+        <<"db_version_stamp">> => convert_seq_to_stamp(DBSeq)
+    }.
+
+
+convert_seq_to_stamp(<<"0">>) ->
+    <<"0-0-0">>;
+
+convert_seq_to_stamp(undefined) ->
+    <<"0-0-0">>;
+
+convert_seq_to_stamp(Seq) ->
+    {_, Stamp, Batch, DocNumber} = fabric2_fdb:seq_to_vs(Seq),
+    VS = integer_to_list(Stamp) ++ "-" ++ integer_to_list(Batch) ++ "-"
+            ++ integer_to_list(DocNumber),
+    list_to_binary(VS).
diff --git a/src/couch_views/test/couch_views_active_tasks_test.erl b/src/couch_views/test/couch_views_active_tasks_test.erl
new file mode 100644
index 0000000..f87e010
--- /dev/null
+++ b/src/couch_views/test/couch_views_active_tasks_test.erl
@@ -0,0 +1,155 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_active_tasks_test).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_views/include/couch_views.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+-define(MAP_FUN1, <<"map_fun1">>).
+-define(MAP_FUN2, <<"map_fun2">>).
+-define(INDEX_FOO, <<"_design/foo">>).
+-define(INDEX_BAR, <<"_design/bar">>).
+-define(TOTAL_DOCS, 1000).
+
+
+setup() ->
+    Ctx = test_util:start_couch([
+            fabric,
+            couch_jobs,
+            couch_js,
+            couch_views
+        ]),
+    Ctx.
+
+
+cleanup(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+
+foreach_setup() ->
+    {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]),
+
+    DDoc = create_ddoc(?INDEX_FOO, ?MAP_FUN1),
+    Docs = make_docs(?TOTAL_DOCS),
+    fabric2_db:update_docs(Db, [DDoc | Docs]),
+
+    {Db, DDoc}.
+
+
+foreach_teardown({Db, _}) ->
+    meck:unload(),
+    ok = fabric2_db:delete(fabric2_db:name(Db), []).
+
+
+active_tasks_test_() ->
+    {
+        "Active Tasks test",
+        {
+            setup,
+            fun setup/0,
+            fun cleanup/1,
+            {
+                foreach,
+                fun foreach_setup/0,
+                fun foreach_teardown/1,
+                [
+                    ?TDEF_FE(verify_basic_active_tasks),
+                    ?TDEF_FE(verify_muliple_active_tasks)
+                ]
+            }
+        }
+    }.
+
+
+verify_basic_active_tasks({Db, DDoc}) ->
+    pause_indexer_for_changes(self()),
+    couch_views:build_indices(Db, [DDoc]),
+    {IndexerPid, {changes_done, ChangesDone}} = wait_to_reach_changes(10000),
+    [ActiveTask] = fabric2_active_tasks:get_active_tasks(),
+    ChangesDone1 = maps:get(<<"changes_done">>, ActiveTask),
+    IndexerPid ! continue,
+    % we assume the indexer has run for a bit so it has to > 0
+    ?assert(ChangesDone1 > 0),
+    ?assert(ChangesDone1 =< ChangesDone),
+    ?assertEqual(ChangesDone, ?TOTAL_DOCS).
+
+
+verify_muliple_active_tasks({Db, DDoc}) ->
+    DDoc2 = create_ddoc(?INDEX_BAR, ?MAP_FUN2),
+    fabric2_db:update_doc(Db, DDoc2, []),
+    pause_indexer_for_changes(self()),
+    couch_views:build_indices(Db, [DDoc, DDoc2]),
+
+    {IndexerPid, {changes_done, ChangesDone}} = wait_to_reach_changes(10000),
+    {IndexerPid2, {changes_done, ChangesDone2}} = wait_to_reach_changes(10000),
+
+    ActiveTasks = fabric2_active_tasks:get_active_tasks(),
+
+    ?assertEqual(length(ActiveTasks), 2),
+
+    IndexerPid ! continue,
+    IndexerPid2 ! continue,
+
+    ?assertEqual(ChangesDone, ?TOTAL_DOCS),
+    ?assertEqual(ChangesDone2, ?TOTAL_DOCS).
+
+
+create_ddoc(DDocId, IndexName) ->
+    couch_doc:from_json_obj({[
+        {<<"_id">>, DDocId},
+        {<<"views">>, {[
+            {IndexName, {[
+                {<<"map">>, <<"function(doc) {emit(doc.val, doc.val);}">>}
+            ]}}
+        ]}}
+    ]}).
+
+
+doc(Id, Val) ->
+    couch_doc:from_json_obj({[
+        {<<"_id">>, list_to_binary(integer_to_list(Id))},
+        {<<"val">>, Val}
+    ]}).
+
+
+make_docs(Count) ->
+    [doc(I, Count) || I <- lists:seq(1, Count)].
+
+
+pause_indexer_for_changes(ParentPid) ->
+    meck:new(couch_views_util, [passthrough]),
+    meck:expect(couch_views_util, active_tasks_info, fun(ChangesDone,
+        DbName, DDocId, LastSeq, DBSeq) ->
+        case ChangesDone of
+            ?TOTAL_DOCS ->
+                ParentPid ! {self(), {changes_done, ChangesDone}},
+                receive continue -> ok end;
+            _ ->
+                ok
+        end,
+        meck:passthrough([ChangesDone, DbName, DDocId, LastSeq,
+            DBSeq])
+    end).
+
+
+wait_to_reach_changes(Timeout) ->
+    receive
+        {Pid, {changes_done, ChangesDone}} when is_pid(Pid) ->
+            {Pid, {changes_done, ChangesDone}}
+    after Timeout ->
+        error(timeout_in_pause_indexer_for_changes)
+    end.