You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/02/18 22:54:43 UTC

[couchdb] 01/01: Let couch_jobs use its own metadata key

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch couch_jobs_uses_its_own_metadata
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 1979d85ae2db879aa04a89bcac83125a09208362
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Feb 18 17:20:15 2020 -0500

    Let couch_jobs use its own metadata key
    
    Previously, if the metadata key is bumped in a transaction the same transaction
    could not be used to add jobs with `couch_jobs`. That's because metadata is a
    versionstamped value, and when set, it cannot be read back until that
    transaction has commited. In `fabric2_fdb` there is a process dict key that is
    set which declares that metadata was already read, which happens before any db
    update, however `couch_jobs` uses it's own caching mechanism and doesn't know
    about that pdict key.
    
    It would be tempting to re-use the same pdict key and just set it from
    whichever module calls `ensure_current`, however that would be incorrect, as
    the two modules don't actually share the same `Db` handle objects, so they do
    need to re-read the most recent key.
    
    Ideally we'd implement a single `couch_fdb` module to be shared between
    `couch_jobs` and `fabric2_db` but until then it maybe simpler to just let
    `couch_jobs` use it's own metadata key. This way, it doesn't get invalidated or
    bumped every time dbs get recreated or design docs are updated. The only time
    it would be bumped is if the fdb layer prefix changed at runime.
---
 src/couch_jobs/src/couch_jobs.hrl        |  1 +
 src/couch_jobs/src/couch_jobs_fdb.erl    | 27 +++++++++++++++++++++++----
 src/couch_jobs/test/couch_jobs_tests.erl | 22 +++++++++++++++++++++-
 3 files changed, 45 insertions(+), 5 deletions(-)

diff --git a/src/couch_jobs/src/couch_jobs.hrl b/src/couch_jobs/src/couch_jobs.hrl
index 055bf09..bb561b1 100644
--- a/src/couch_jobs/src/couch_jobs.hrl
+++ b/src/couch_jobs/src/couch_jobs.hrl
@@ -35,6 +35,7 @@
 -define(ACTIVITY, 6).
 
 
+-define(COUCH_JOBS_MD_VERSION, <<"couch_jobs_md_version">>).
 -define(COUCH_JOBS_EVENT, '$couch_jobs_event').
 -define(COUCH_JOBS_CURRENT, '$couch_jobs_current').
 -define(UNDEFINED_MAX_SCHEDULED_TIME, 1 bsl 36).
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
index a08b78f..a81a313 100644
--- a/src/couch_jobs/src/couch_jobs_fdb.erl
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -46,7 +46,10 @@
     tx/2,
 
     get_job/2,
-    get_jobs/0
+    get_jobs/0,
+
+    bump_metadata_version/0,
+    bump_metadata_version/1
 ]).
 
 
@@ -485,6 +488,19 @@ get_jobs() ->
     end).
 
 
+% Call this function if the top level "couchdb" FDB directory layer
+% changes.
+%
+bump_metadata_version() ->
+    fabric2_fdb:transactional(fun(Tx) ->
+        bump_metadata_version(Tx)
+    end).
+
+
+bump_metadata_version(Tx) ->
+    erlfdb:set_versionstamped_value(Tx, ?COUCH_JOBS_MD_VERSION, <<0:112>>).
+
+
 % Private helper functions
 
 maybe_enqueue(#{jtx := true} = JTx, Type, JobId, STime, Resubmit, Data) ->
@@ -617,7 +633,6 @@ init_jtx(undefined) ->
 init_jtx({erlfdb_transaction, _} = Tx) ->
     LayerPrefix = fabric2_fdb:get_dir(Tx),
     Jobs = erlfdb_tuple:pack({?JOBS}, LayerPrefix),
-    Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
     % layer_prefix, md_version and tx here match db map fields in fabric2_fdb
     % but we also assert that this is a job transaction using the jtx => true
     % field
@@ -626,7 +641,7 @@ init_jtx({erlfdb_transaction, _} = Tx) ->
         tx => Tx,
         layer_prefix => LayerPrefix,
         jobs_path => Jobs,
-        md_version => Version
+        md_version => get_metadata_version(Tx)
     }.
 
 
@@ -641,13 +656,17 @@ ensure_current(#{jtx := true, tx := Tx} = JTx) ->
     end.
 
 
+get_metadata_version({erlfdb_transaction, _} = Tx) ->
+    erlfdb:wait(erlfdb:get_ss(Tx, ?COUCH_JOBS_MD_VERSION)).
+
+
 update_current(#{tx := Tx, md_version := Version} = JTx) ->
     case get_md_version_age(Version) of
         Age when Age =< ?MD_VERSION_MAX_AGE_SEC ->
             % Looked it up not too long ago. Avoid looking it up to frequently
             JTx;
         _ ->
-            case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of
+            case get_metadata_version(Tx) of
                 Version ->
                     update_md_version_timestamp(Version),
                     JTx;
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
index a7e085e..62a75c8 100644
--- a/src/couch_jobs/test/couch_jobs_tests.erl
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -54,7 +54,8 @@ couch_jobs_basic_test_() ->
                     fun enqueue_inactive/1,
                     fun remove_running_job/1,
                     fun check_get_jobs/1,
-                    fun use_fabric_transaction_object/1
+                    fun use_fabric_transaction_object/1,
+                    fun metadata_version_bump/1
                 ]
             }
         }
@@ -604,3 +605,22 @@ use_fabric_transaction_object(#{t1 := T1, j1 := J1, dbname := DbName}) ->
         ok = couch_jobs:remove(#{tx => undefined}, T1, J1),
         ok = fabric2_db:delete(DbName, [])
     end).
+
+
+metadata_version_bump(_) ->
+    ?_test(begin
+        JTx1 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
+        ?assertMatch(#{md_version := not_found}, JTx1),
+
+        ets:delete_all_objects(couch_jobs_fdb),
+        couch_jobs_fdb:bump_metadata_version(),
+        JTx2 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
+        ?assertMatch(#{md_version := Bin} when is_binary(Bin), JTx2),
+
+        ets:delete_all_objects(couch_jobs_fdb),
+        couch_jobs_fdb:bump_metadata_version(),
+        JTx3 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
+        OldMdv = maps:get(md_version, JTx2),
+        NewMdv = maps:get(md_version, JTx3),
+        ?assert(NewMdv > OldMdv)
+    end).