You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2020/04/02 12:39:33 UTC

[couchdb] 05/18: Add couch_views_indexer build to creation versionstamp

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch fdb-mango-indexes
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 9dfe9e53532d8193ff5ee7a5482437879c95a851
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Mon Mar 23 14:28:37 2020 +0200

    Add couch_views_indexer build to creation versionstamp
    
    This creates a versionstamp for when an indexed was created
    and build status for indexes. if the index has a creation_vs, then
    couch_views_indexer will built the index to this creation versionstamp.
---
 src/couch_views/include/couch_views.hrl            |  6 ++
 src/couch_views/src/couch_views_fdb.erl            | 76 +++++++++++++++++++++
 src/couch_views/src/couch_views_indexer.erl        | 77 ++++++++++++++++------
 src/couch_views/src/couch_views_jobs.erl           | 21 ++++--
 src/couch_views/test/couch_views_indexer_test.erl  | 42 +++++++++++-
 .../test/couch_views_trace_index_test.erl          |  5 +-
 src/fabric/src/fabric2_fdb.erl                     | 12 ++--
 7 files changed, 206 insertions(+), 33 deletions(-)

diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index c40bb02..3d0110f 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -18,6 +18,8 @@
 -define(VIEW_UPDATE_SEQ, 0).
 -define(VIEW_ROW_COUNT, 1).
 -define(VIEW_KV_SIZE, 2).
+-define(VIEW_BUILD_STATUS, 3).
+-define(VIEW_CREATION_VS, 4).
 
 % Data keys
 -define(VIEW_ID_RANGE, 0).
@@ -25,3 +27,7 @@
 
 % jobs api
 -define(INDEX_JOB_TYPE, <<"views">>).
+
+% indexing progress
+-define(INDEX_BUILDING, <<"building">>).
+-define(INDEX_READY, <<"ready">>).
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index a0224b2..3b008d4 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -13,6 +13,12 @@
 -module(couch_views_fdb).
 
 -export([
+    new_interactive_index/3,
+    new_creation_vs/3,
+    get_creation_vs/2,
+    get_build_status/2,
+    set_build_status/3,
+
     get_update_seq/2,
     set_update_seq/3,
 
@@ -39,6 +45,60 @@
 -include_lib("fabric/include/fabric2.hrl").
 
 
+new_interactive_index(Db, Mrst, VS) ->
+    couch_views_fdb:new_creation_vs(Db, Mrst, VS),
+    couch_views_fdb:set_build_status(Db, Mrst, ?INDEX_BUILDING).
+
+
+%Interactive View Creation Versionstamp
+%(<db>, ?DB_VIEWS, ?VIEW_INFO, ?VIEW_CREATION_VS, Sig) = VS
+
+new_creation_vs(TxDb, #mrst{} = Mrst, VS) ->
+    #{
+        tx := Tx
+    } = TxDb,
+    Key = creation_vs_key(TxDb, Mrst#mrst.sig),
+    Value = erlfdb_tuple:pack_vs({VS}),
+    ok = erlfdb:set_versionstamped_value(Tx, Key, Value).
+
+
+get_creation_vs(TxDb, #mrst{} = Mrst) ->
+    get_creation_vs(TxDb, Mrst#mrst.sig);
+
+get_creation_vs(TxDb, Sig) ->
+    #{
+        tx := Tx
+    } = TxDb,
+    Key = creation_vs_key(TxDb, Sig),
+    case erlfdb:wait(erlfdb:get(Tx, Key)) of
+        not_found ->
+            not_found;
+        EK ->
+            {VS} = erlfdb_tuple:unpack(EK),
+            VS
+    end.
+
+
+%Interactive View Build Status
+%(<db>, ?DB_VIEWS, ?VIEW_INFO, ?VIEW_BUILD_STATUS, Sig) = INDEX_BUILDING | INDEX_READY
+
+get_build_status(TxDb, #mrst{sig = Sig}) ->
+    #{
+        tx := Tx
+    } = TxDb,
+    Key = build_status_key(TxDb, Sig),
+    erlfdb:wait(erlfdb:get(Tx, Key)).
+
+
+set_build_status(TxDb, #mrst{sig = Sig}, State) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    Key = build_status_key(TxDb, Sig),
+    ok = erlfdb:set(Tx, Key, State).
+
+
 % View Build Sequence Access
 % (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
 
@@ -340,6 +400,22 @@ map_idx_range(DbPrefix, Sig, ViewId, MapKey, DocId) ->
     erlfdb_tuple:range(Key, DbPrefix).
 
 
+creation_vs_key(Db, Sig) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+    Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_CREATION_VS, Sig},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+build_status_key(Db, Sig) ->
+    #{
+        db_prefix := DbPrefix
+    } = Db,
+    Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_BUILD_STATUS, Sig},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
 process_rows(Rows) ->
     Encoded = lists:map(fun({K, V}) ->
         EK1 = couch_views_encoding:encode(K, key),
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 0127bac..ab5aaad 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -18,7 +18,9 @@
 
 
 -export([
-    init/0
+    init/0,
+    map_docs/2,
+    write_docs/4
 ]).
 
 -ifdef(TEST).
@@ -80,6 +82,7 @@ init() ->
         db_seq => undefined,
         view_seq => undefined,
         last_seq => undefined,
+        view_vs => undefined,
         job => Job,
         job_data => Data,
         count => 0,
@@ -174,22 +177,7 @@ update(#{} = Db, Mrst0, State0) ->
 
 do_update(Db, Mrst0, State0) ->
     fabric2_fdb:transactional(Db, fun(TxDb) ->
-        % In the first iteration of update we need
-        % to populate our db and view sequences
-        State1 = case State0 of
-            #{db_seq := undefined} ->
-                ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst0),
-                State0#{
-                    tx_db := TxDb,
-                    db_seq := fabric2_db:get_update_seq(TxDb),
-                    view_seq := ViewSeq,
-                    last_seq := ViewSeq
-                };
-            _ ->
-                State0#{
-                    tx_db := TxDb
-                }
-        end,
+        State1 = get_update_start_state(TxDb, Mrst0, State0),
 
         {ok, State2} = fold_changes(State1),
 
@@ -198,7 +186,8 @@ do_update(Db, Mrst0, State0) ->
             doc_acc := DocAcc,
             last_seq := LastSeq,
             limit := Limit,
-            limiter := Limiter
+            limiter := Limiter,
+            view_vs := ViewVS
         } = State2,
         DocAcc1 = fetch_docs(TxDb, DocAcc),
         couch_rate:in(Limiter, Count),
@@ -210,6 +199,8 @@ do_update(Db, Mrst0, State0) ->
 
         case Count < Limit of
             true ->
+                maybe_set_build_status(TxDb, Mrst1, ViewVS,
+                    ?INDEX_READY),
                 report_progress(State2, finished),
                 {Mrst1, finished};
             false ->
@@ -224,6 +215,33 @@ do_update(Db, Mrst0, State0) ->
     end).
 
 
+maybe_set_build_status(_TxDb, _Mrst1, not_found, _State) ->
+    ok;
+
+maybe_set_build_status(TxDb, Mrst1, _ViewVS, State) ->
+    couch_views_fdb:set_build_status(TxDb, Mrst1, State).
+
+
+% In the first iteration of update we need
+% to populate our db and view sequences
+get_update_start_state(TxDb, Mrst, #{db_seq := undefined} = State) ->
+    ViewVS = couch_views_fdb:get_creation_vs(TxDb, Mrst),
+    ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
+
+    State#{
+        tx_db := TxDb,
+        db_seq := fabric2_db:get_update_seq(TxDb),
+        view_vs := ViewVS,
+        view_seq := ViewSeq,
+        last_seq := ViewSeq
+    };
+
+get_update_start_state(TxDb, _Idx, State) ->
+    State#{
+        tx_db := TxDb
+    }.
+
+
 fold_changes(State) ->
     #{
         view_seq := SinceSeq,
@@ -240,7 +258,8 @@ process_changes(Change, Acc) ->
     #{
         doc_acc := DocAcc,
         count := Count,
-        design_opts := DesignOpts
+        design_opts := DesignOpts,
+        view_vs := ViewVS
     } = Acc,
 
     #{
@@ -263,8 +282,22 @@ process_changes(Change, Acc) ->
                 last_seq := LastSeq
             }
     end,
-    {ok, Acc1}.
 
+    DocVS = fabric2_fdb:seq_to_vs(LastSeq),
+
+    Go = maybe_stop_at_vs(ViewVS, DocVS),
+    {Go, Acc1}.
+
+
+maybe_stop_at_vs({versionstamp, _} = ViewVS, DocVS) when DocVS >= ViewVS ->
+    stop;
+
+maybe_stop_at_vs(_, _) ->
+    ok.
+
+
+map_docs(Mrst, []) ->
+    {Mrst, []};
 
 map_docs(Mrst, Docs) ->
     % Run all the non deleted docs through the view engine and
@@ -328,7 +361,9 @@ write_docs(TxDb, Mrst, Docs, State) ->
         N + 1
     end, 0, Docs),
 
-    couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq),
+    if LastSeq == false -> ok; true ->
+        couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq)
+    end,
     DocsNumber.
 
 
diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl
index 1604841..b97e7ce 100644
--- a/src/couch_views/src/couch_views_jobs.erl
+++ b/src/couch_views/src/couch_views_jobs.erl
@@ -40,11 +40,12 @@ build_view(TxDb, Mrst, UpdateSeq) ->
     end.
 
 
-build_view_async(TxDb, Mrst) ->
-    JobId = job_id(TxDb, Mrst),
-    JobData = job_data(TxDb, Mrst),
-    DbUUID = fabric2_db:get_uuid(TxDb),
-    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+build_view_async(TxDb0, Mrst) ->
+    JobId = job_id(TxDb0, Mrst),
+    JobData = job_data(TxDb0, Mrst),
+    DbUUID = fabric2_db:get_uuid(TxDb0),
+    TxDb1 = ensure_correct_tx(TxDb0),
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(TxDb1), fun(JTx) ->
         case couch_jobs:get_job_data(JTx, ?INDEX_JOB_TYPE, JobId) of
             {error, not_found} ->
                 ok;
@@ -59,6 +60,16 @@ build_view_async(TxDb, Mrst) ->
     {ok, JobId}.
 
 
+ensure_correct_tx(#{tx := undefined} = TxDb) ->
+    TxDb;
+
+ensure_correct_tx(#{tx := Tx} = TxDb) ->
+    case erlfdb:is_read_only(Tx) of
+        true -> TxDb#{tx := undefined};
+        false -> TxDb
+    end.
+
+
 wait_for_job(JobId, UpdateSeq) ->
     case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of
         {ok, Subscription, _State, _Data} ->
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
index 43b5828..8ddb64b 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -51,7 +51,8 @@ indexer_test_() ->
                     ?TDEF_FE(index_autoupdater_callback),
                     ?TDEF_FE(handle_db_recreated_when_running),
                     ?TDEF_FE(handle_db_recreated_after_finished),
-                    ?TDEF_FE(index_budget_is_changing)
+                    ?TDEF_FE(index_budget_is_changing),
+                    ?TDEF_FE(index_can_recover_from_crash, 60)
                 ]
             }
         }
@@ -508,6 +509,41 @@ handle_db_recreated_after_finished(Db) ->
     ], Out2).
 
 
+index_can_recover_from_crash(Db) ->
+    ok = meck:new(config, [passthrough]),
+    ok = meck:expect(config, get_integer, fun(Section, Key, Default) ->
+        case Section == "couch_views" andalso Key == "change_limit" of
+            true -> 1;
+            _ -> Default
+        end
+    end),
+    meck:new(couch_eval, [passthrough]),
+    meck:expect(couch_eval, map_docs, fun(State, Docs) ->
+        Doc = hd(Docs),
+        case Doc#doc.id == <<"2">> of
+            true ->
+                % remove the mock so that next time the doc is processed
+                % it will work
+                meck:unload(couch_eval),
+                throw({fake_crash, test_jobs_restart});
+            false ->
+                meck:passthrough([State, Docs])
+        end
+    end),
+
+    DDoc = create_ddoc(),
+    Docs = make_docs(3),
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, _} = fabric2_db:update_docs(Db, Docs, []),
+
+    {ok, Out} = run_query(Db, DDoc, ?MAP_FUN1),
+    ?assertEqual([
+        row(<<"1">>, 1, 1),
+        row(<<"2">>, 2, 2),
+        row(<<"3">>, 3, 3)
+    ], Out).
+
+
 row(Id, Key, Value) ->
     {row, [
         {id, Id},
@@ -603,6 +639,10 @@ create_ddoc(multi_emit_key_limit) ->
     ]}).
 
 
+make_docs(Count) ->
+    [doc(I) || I <- lists:seq(1, Count)].
+
+
 doc(Id) ->
     doc(Id, Id).
 
diff --git a/src/couch_views/test/couch_views_trace_index_test.erl b/src/couch_views/test/couch_views_trace_index_test.erl
index f8a5ce5..5b15a4c 100644
--- a/src/couch_views/test/couch_views_trace_index_test.erl
+++ b/src/couch_views/test/couch_views_trace_index_test.erl
@@ -77,10 +77,13 @@ trace_single_doc(Db) ->
     {ok, _} = fabric2_db:update_doc(Db, Doc, []),
     {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
 
+    HexSig = fabric2_util:to_hex(Mrst#mrst.sig),
     JobData = #{
         <<"db_name">> => DbName,
+        <<"db_uuid">> => fabric2_db:get_uuid(Db),
         <<"ddoc_id">> => <<"_design/bar">>,
-        <<"sig">> => fabric2_util:to_hex(Mrst#mrst.sig)
+        <<"sig">> => HexSig,
+        <<"retries">> => 0
     },
     meck:expect(couch_jobs, accept, 2, {ok, job, JobData}),
     meck:expect(couch_jobs, update, 3, {ok, job}),
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 912d4df..2295a56 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -64,6 +64,8 @@
     seq_to_vs/1,
     next_vs/1,
 
+    new_versionstamp/1,
+
     debug_cluster/0,
     debug_cluster/2
 ]).
@@ -1021,6 +1023,11 @@ next_vs({versionstamp, VS, Batch, TxId}) ->
     {versionstamp, V, B, T}.
 
 
+new_versionstamp(Tx) ->
+    TxId = erlfdb:get_next_tx_id(Tx),
+    {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
+
+
 debug_cluster() ->
     debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
 
@@ -1763,11 +1770,6 @@ get_transaction_id(Tx, LayerPrefix) ->
     end.
 
 
-new_versionstamp(Tx) ->
-    TxId = erlfdb:get_next_tx_id(Tx),
-    {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
-
-
 on_commit(Tx, Fun) when is_function(Fun, 0) ->
     % Here we rely on Tx objects matching. However they contain a nif resource
     % object. Before Erlang 20.0 those would have been represented as empty