You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/02/07 21:06:53 UTC

[couchdb] 01/01: Use multi-transactional iterators

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch multi-transactional-iterators-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 194a8bc05a69a0501f9cf3a2c16da1a153168232
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Jan 31 14:58:41 2020 -0500

    Use multi-transactional iterators
---
 src/fabric/include/fabric2.hrl  |   5 +-
 src/fabric/src/fabric2_db.erl   |  24 ++++-
 src/fabric/src/fabric2_fdb.erl  | 217 ++++++++++++++++++++++++++++++++++++----
 src/fabric/src/fabric2_util.erl |   5 +
 4 files changed, 228 insertions(+), 23 deletions(-)

diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 828a51b..0be27fb 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -57,7 +57,10 @@
 -define(PDICT_TX_ID_KEY, '$fabric_tx_id').
 -define(PDICT_TX_RES_KEY, '$fabric_tx_result').
 -define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun').
--define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(PDICT_ITER_CHECKPOINT, '$fabric_iter_checkpoint').
+-define(PDICT_ITER_VALIDATE_DB, '$fabric_iter_validate_db').
 
+-define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(TRANSACTION_TOO_OLD, 1007).
 
 -define(BINARY_CHUNK_SIZE, 100000).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 6d015df..7b9e726 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -211,18 +211,26 @@ list_dbs() ->
 
 
 list_dbs(Options) ->
+    TxFun = case fabric2_util:get_value(iterator, Options) of
+        true -> with_iter;
+        undefined -> transactional
+    end,
     Callback = fun(DbName, Acc) -> [DbName | Acc] end,
-    DbNames = fabric2_fdb:transactional(fun(Tx) ->
+    DbNames = fabric2_fdb:TxFun(fun(Tx) ->
         fabric2_fdb:list_dbs(Tx, Callback, [], Options)
     end),
     lists:reverse(DbNames).
 
 
 list_dbs(UserFun, UserAcc0, Options) ->
+    TxFun = case fabric2_util:get_value(iterator, Options) of
+        true -> with_iter;
+        undefined -> transactional
+    end,
     FoldFun = fun
         (DbName, Acc) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc))
     end,
-    fabric2_fdb:transactional(fun(Tx) ->
+    fabric2_fdb:TxFun(fun(Tx) ->
         try
             UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
             UserAcc2 = fabric2_fdb:list_dbs(
@@ -755,7 +763,11 @@ fold_docs(Db, UserFun, UserAcc) ->
 
 
 fold_docs(Db, UserFun, UserAcc0, Options) ->
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
+    TxFun = case fabric2_util:get_value(iterator, Options) of
+        true -> with_iter;
+        undefined -> transactional
+    end,
+    fabric2_fdb:TxFun(Db, fun(TxDb) ->
         try
             #{
                 db_prefix := DbPrefix
@@ -829,7 +841,11 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
 
 
 fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
+    TxFun = case fabric2_util:get_value(iterator, Options) of
+        true -> with_iter;
+        undefined -> transactional
+    end,
+    fabric2_fdb:TxFun(Db, fun(TxDb) ->
         try
             #{
                 db_prefix := DbPrefix
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 6abe1f6..4c65b8e 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -24,6 +24,12 @@
     delete/1,
     exists/1,
 
+    create_iter/0,
+    create_iter/1,
+    destroy_iter/1,
+    with_iter/1,
+    with_iter/2,
+
     get_dir/1,
 
     list_dbs/4,
@@ -69,6 +75,17 @@
 -include("fabric2.hrl").
 
 
+-record(fold_acc, {
+   start_key,
+   end_key,
+   skip,
+   opts,
+   ucallback,
+   uacc
+}).
+
+
+
 transactional(Fun) ->
     do_transaction(Fun, undefined).
 
@@ -307,6 +324,79 @@ exists(#{name := DbName} = Db) when is_binary(DbName) ->
     end.
 
 
+create_iter() ->
+    case get(?PDICT_ITER_CHECKPOINT) of
+        undefined -> ok;
+        _ -> error(iterator_already_created)
+    end,
+    Fdb = get_db_handle(),
+    Tx = erlfdb:create_transaction(Fdb),
+    erlfdb:set_option(Tx, disallow_writes),
+    erlfdb:set_option(Tx, retry_limit, 0),
+    erlfdb:set_option(Tx, max_retry_delay, 0),
+    Tx.
+
+
+create_iter(#{tx := undefined} = Db) ->
+    try
+
+        Db1 = refresh(Db),
+
+        Reopen = maps:get(reopen, Db1, false),
+        Db2 = maps:remove(reopen, Db1),
+
+        Tx = create_iter(),
+
+        Db3 = case Reopen of
+            true -> reopen(Db2#{tx => Tx});
+            false -> Db2#{tx => Tx}
+        end,
+
+        % Here we might throw `reopen`
+        Db4 = ensure_current(Db3),
+
+        % This part might update the Db cache
+        ok = run_on_commit_fun(Tx),
+        erase({?PDICT_ON_COMMIT_FUN, Tx}),
+
+        % Save the initial Db handle so we can validate
+        % that the same db was still running
+        put(?PDICT_ITER_VALIDATE_DB, Db4),
+
+        Db4#{tx := Tx}
+    catch throw:{?MODULE, reopen} ->
+        create_iter(Db#{reopen => true})
+    end.
+
+
+destroy_iter({erlfdb_transaction, _}) ->
+    erase(?PDICT_ITER_CHECKPOINT);
+
+destroy_iter(#{tx := {erfdb_transaction, _}} = Db) ->
+    #{tx := Tx} = Db,
+    erase(?PDICT_ITER_VALIDATE_DB),
+    destroy_iter(Tx),
+    Db#{tx := undefined}.
+
+
+with_iter(Fun) when is_function(Fun, 1) ->
+    Tx = create_iter(),
+    try
+        Fun(Tx)
+    after
+        destroy_iter(Tx)
+    end.
+
+
+with_iter(#{tx := undefined} = Db, Fun) when is_function(Fun, 1) ->
+    IterDb = create_iter(Db),
+    try
+        Fun(IterDb)
+    after
+        destroy_iter(IterDb)
+    end.
+
+
 get_dir(Tx) ->
     Root = erlfdb_directory:root(),
     Dir = fabric2_server:fdb_directory(),
@@ -821,19 +911,50 @@ fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
     } = ensure_current(Db),
     fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
 
-fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
-    case fabric2_util:get_value(limit, Options) of
-        0 ->
+fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options) ->
+    Iterator = fabric2_util:get_value(iterator, Options),
+    case {fabric2_util:get_value(limit, Options), Iterator} of
+        {0, _} ->
             % FoundationDB treats a limit of 0 as unlimited
             % so we have to guard for that here.
-            UserAcc;
-        _ ->
-            {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
-            Callback = fun fold_range_cb/2,
-            Acc = {skip, Skip, UserCallback, UserAcc},
-            {skip, _, UserCallback, OutAcc} =
-                    erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
-            OutAcc
+            Acc;
+        {_, undefined} ->
+            FAcc = get_fold_acc(RangePrefix, Callback, Acc, Options),
+            fold_range_tx(Tx, FAcc);
+        {_, true} ->
+            FAcc = get_fold_acc(RangePrefix, Callback, Acc, Options),
+            fold_range_iter(Tx, FAcc)
+    end.
+
+
+fold_range_tx(Tx, #fold_acc{} = FAcc) ->
+    #fold_acc{
+        start_key = Start,
+        end_key = End,
+        opts = Opts
+    } = FAcc,
+    Callback = fun fold_range_tx_cb/2,
+    FAccOut = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts),
+    FAccOut#fold_acc.uacc.
+
+
+fold_range_iter(Tx, #fold_acc{} = FAcc) ->
+    #fold_acc{
+        start_key = Start,
+        end_key = End,
+        opts = Opts
+    } = FAcc,
+    Callback = fun fold_range_iter_cb/2,
+    put(?PDICT_ITER_CHECKPOINT, FAcc),
+    try erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts) of
+        #fold_acc{uacc = UAccOut} -> UAccOut
+    catch
+        error:{erlfdb_error, ?TRANSACTION_TOO_OLD} ->
+            #fold_acc{} = FAcc1 = get(?PDICT_ITER_CHECKPOINT),
+            io:format(standard_error, "~n **** resetting Tx ~p FAcc1:~p~n", [Tx, FAcc1]),
+            couch_log:error("**** RESETTING Tx ~p FAcc1:~p~n", [Tx, FAcc1]),
+            ok = reset_iter_tx(Tx),
+            fold_range_iter(Tx, FAcc1)
     end.
 
 
@@ -1277,7 +1398,7 @@ chunkify_binary(Data) ->
     end.
 
 
-get_fold_opts(RangePrefix, Options) ->
+get_fold_acc(RangePrefix, UserCallback, UserAcc, Options) ->
     Reverse = case fabric2_util:get_value(dir, Options) of
         rev -> true;
         _ -> false
@@ -1367,15 +1488,75 @@ get_fold_opts(RangePrefix, Options) ->
             ++ StreamingMode
             ++ Snapshot,
 
-    {StartKey3, EndKey3, Skip, OutOpts}.
+    #fold_acc{
+        start_key = StartKey3,
+        end_key = EndKey3,
+        skip = Skip,
+        opts = OutOpts,
+        ucallback = UserCallback,
+        uacc = UserAcc
+    }.
+
+
+fold_range_tx_cb(KV, #fold_acc{skip = 0} = FAcc) ->
+    #fold_acc{ucallback = UCallback, uacc = UAcc} = FAcc,
+    NewUAcc = UCallback(KV, UAcc),
+    FAcc#fold_acc{uacc = NewUAcc};
+
+fold_range_tx_cb(_KV, #fold_acc{skip = N} = FAcc) when N > 0 ->
+    FAcc#fold_acc{skip = N - 1}.
+
+
+fold_range_iter_cb({K, V}, #fold_acc{skip = 0} = FAcc) ->
+    #fold_acc{ucallback = UCallback, uacc = UAcc} = FAcc,
+    NewUAcc = UCallback({K, V}, UAcc),
+    NewFAcc = next_iter_acc(K, FAcc),
+    put(?PDICT_ITER_CHECKPOINT, NewFAcc),
+    NewFAcc#fold_acc{uacc = NewUAcc};
+
 
+fold_range_iter_cb({K, _V}, #fold_acc{skip = N} = FAcc) when N > 0 ->
+    put(?PDICT_ITER_CHECKPOINT, FAcc),
+    next_iter_acc(K, FAcc).
 
-fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
-    NewAcc = Callback(KV, Acc),
-    {skip, 0, Callback, NewAcc};
 
-fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
-    {skip, N - 1, Callback, Acc}.
+next_iter_acc(K, #fold_acc{} = FAcc) ->
+    #fold_acc{skip = Skip, opts = Opts} = FAcc,
+    Opts1 = case fabric2_util:get_value(limit, Opts) of
+        N when is_integer(N), N > 0 ->
+            fabric2_util:replace_value(limit, Opts, N - 1);
+        undefined ->
+            Opts
+    end,
+    FAcc1 = FAcc#fold_acc{opts = Opts1, skip = max(0, Skip - 1)},
+    case fabric2_util:get_value(reverse, Opts, false) of
+        true -> FAcc1#fold_acc{end_key = K};
+        false -> FAcc1#fold_acc{start_key = K}
+    end.
+
+
+reset_iter_tx({erlfdb_transaction, _} = Tx) ->
+    ok = erlfdb:reset(Tx),
+    erlfdb:set_option(Tx, retry_limit, 0),
+    erlfdb:set_option(Tx, max_retry_delay, 0),
+    ok = iterator_db_validate(Tx, get(?PDICT_ITER_VALIDATE_DB)).
+
+
+iterator_db_validate({erlfdb_transaction, _}, undefined) ->
+    ok;
+
+iterator_db_validate({erlfdb_transaction, _} = Tx, #{} = Db) ->
+    #{
+        uuid := UUID,
+        name := DbName,
+        layer_prefix := LayerPrefix
+    } = Db,
+    DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
+    UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
+    case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of
+        UUID -> ok;
+        _ -> error(database_does_not_exist)
+    end.
 
 
 get_db_handle() ->
diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl
index 4e2e2d7..8d04219 100644
--- a/src/fabric/src/fabric2_util.erl
+++ b/src/fabric/src/fabric2_util.erl
@@ -31,6 +31,7 @@
 
     get_value/2,
     get_value/3,
+    replace_value/3,
     to_hex/1,
     from_hex/1,
     uuid/0
@@ -160,6 +161,10 @@ get_value(Key, List, Default) ->
     end.
 
 
+replace_value(Key, Val, List) ->
+    lists:keyreplace(Key, 1, List, {Key, Val}).
+
+
 to_hex(Bin) ->
     list_to_binary(to_hex_int(Bin)).