You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/02/07 21:06:52 UTC

[couchdb] branch multi-transactional-iterators-2 created (now 194a8bc)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch multi-transactional-iterators-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at 194a8bc  Use multi-transactional iterators

This branch includes the following new commits:

     new 194a8bc  Use multi-transactional iterators

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: Use multi-transactional iterators

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch multi-transactional-iterators-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 194a8bc05a69a0501f9cf3a2c16da1a153168232
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Jan 31 14:58:41 2020 -0500

    Use multi-transactional iterators
---
 src/fabric/include/fabric2.hrl  |   5 +-
 src/fabric/src/fabric2_db.erl   |  24 ++++-
 src/fabric/src/fabric2_fdb.erl  | 217 ++++++++++++++++++++++++++++++++++++----
 src/fabric/src/fabric2_util.erl |   5 +
 4 files changed, 228 insertions(+), 23 deletions(-)

diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 828a51b..0be27fb 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -57,7 +57,10 @@
 -define(PDICT_TX_ID_KEY, '$fabric_tx_id').
 -define(PDICT_TX_RES_KEY, '$fabric_tx_result').
 -define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun').
--define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(PDICT_ITER_CHECKPOINT, '$fabric_iter_checkpoint').
+-define(PDICT_ITER_VALIDATE_DB, '$fabric_iter_validate_db').
 
+-define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(TRANSACTION_TOO_OLD, 1007).
 
 -define(BINARY_CHUNK_SIZE, 100000).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 6d015df..7b9e726 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -211,18 +211,26 @@ list_dbs() ->
 
 
 list_dbs(Options) ->
+    TxFun = case fabric2_util:get_value(iterator, Options) of
+        true -> with_iter;
+        undefined -> transactional
+    end,
     Callback = fun(DbName, Acc) -> [DbName | Acc] end,
-    DbNames = fabric2_fdb:transactional(fun(Tx) ->
+    DbNames = fabric2_fdb:TxFun(fun(Tx) ->
         fabric2_fdb:list_dbs(Tx, Callback, [], Options)
     end),
     lists:reverse(DbNames).
 
 
 list_dbs(UserFun, UserAcc0, Options) ->
+    TxFun = case fabric2_util:get_value(iterator, Options) of
+        true -> with_iter;
+        undefined -> transactional
+    end,
     FoldFun = fun
         (DbName, Acc) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc))
     end,
-    fabric2_fdb:transactional(fun(Tx) ->
+    fabric2_fdb:TxFun(fun(Tx) ->
         try
             UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
             UserAcc2 = fabric2_fdb:list_dbs(
@@ -755,7 +763,11 @@ fold_docs(Db, UserFun, UserAcc) ->
 
 
 fold_docs(Db, UserFun, UserAcc0, Options) ->
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
+    TxFun = case fabric2_util:get_value(iterator, Options) of
+        true -> with_iter;
+        undefined -> transactional
+    end,
+    fabric2_fdb:TxFun(Db, fun(TxDb) ->
         try
             #{
                 db_prefix := DbPrefix
@@ -829,7 +841,11 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
 
 
 fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
-    fabric2_fdb:transactional(Db, fun(TxDb) ->
+    TxFun = case fabric2_util:get_value(iterator, Options) of
+        true -> with_iter;
+        undefined -> transactional
+    end,
+    fabric2_fdb:TxFun(Db, fun(TxDb) ->
         try
             #{
                 db_prefix := DbPrefix
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 6abe1f6..4c65b8e 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -24,6 +24,12 @@
     delete/1,
     exists/1,
 
+    create_iter/0,
+    create_iter/1,
+    destroy_iter/1,
+    with_iter/1,
+    with_iter/2,
+
     get_dir/1,
 
     list_dbs/4,
@@ -69,6 +75,17 @@
 -include("fabric2.hrl").
 
 
+-record(fold_acc, {
+   start_key,
+   end_key,
+   skip,
+   opts,
+   ucallback,
+   uacc
+}).
+
+
+
 transactional(Fun) ->
     do_transaction(Fun, undefined).
 
@@ -307,6 +324,79 @@ exists(#{name := DbName} = Db) when is_binary(DbName) ->
     end.
 
 
+create_iter() ->
+    case get(?PDICT_ITER_CHECKPOINT) of
+        undefined -> ok;
+        _ -> error(iterator_already_created)
+    end,
+    Fdb = get_db_handle(),
+    Tx = erlfdb:create_transaction(Fdb),
+    erlfdb:set_option(Tx, disallow_writes),
+    erlfdb:set_option(Tx, retry_limit, 0),
+    erlfdb:set_option(Tx, max_retry_delay, 0),
+    Tx.
+
+
+create_iter(#{tx := undefined} = Db) ->
+    try
+
+        Db1 = refresh(Db),
+
+        Reopen = maps:get(reopen, Db1, false),
+        Db2 = maps:remove(reopen, Db1),
+
+        Tx = create_iter(),
+
+        Db3 = case Reopen of
+            true -> reopen(Db2#{tx => Tx});
+            false -> Db2#{tx => Tx}
+        end,
+
+        % Here we might throw `reopen`
+        Db4 = ensure_current(Db3),
+
+        % This part might update the Db cache
+        ok = run_on_commit_fun(Tx),
+        erase({?PDICT_ON_COMMIT_FUN, Tx}),
+
+        % Save the initial Db handle so we can validate
+        % that the same db was still running
+        put(?PDICT_ITER_VALIDATE_DB, Db4),
+
+        Db4#{tx := Tx}
+    catch throw:{?MODULE, reopen} ->
+        create_iter(Db#{reopen => true})
+    end.
+
+
+destroy_iter({erlfdb_transaction, _}) ->
+    erase(?PDICT_ITER_CHECKPOINT);
+
+destroy_iter(#{tx := {erfdb_transaction, _}} = Db) ->
+    #{tx := Tx} = Db,
+    erase(?PDICT_ITER_VALIDATE_DB),
+    destroy_iter(Tx),
+    Db#{tx := undefined}.
+
+
+with_iter(Fun) when is_function(Fun, 1) ->
+    Tx = create_iter(),
+    try
+        Fun(Tx)
+    after
+        destroy_iter(Tx)
+    end.
+
+
+with_iter(#{tx := undefined} = Db, Fun) when is_function(Fun, 1) ->
+    IterDb = create_iter(Db),
+    try
+        Fun(IterDb)
+    after
+        destroy_iter(IterDb)
+    end.
+
+
 get_dir(Tx) ->
     Root = erlfdb_directory:root(),
     Dir = fabric2_server:fdb_directory(),
@@ -821,19 +911,50 @@ fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
     } = ensure_current(Db),
     fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
 
-fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
-    case fabric2_util:get_value(limit, Options) of
-        0 ->
+fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options) ->
+    Iterator = fabric2_util:get_value(iterator, Options),
+    case {fabric2_util:get_value(limit, Options), Iterator} of
+        {0, _} ->
             % FoundationDB treats a limit of 0 as unlimited
             % so we have to guard for that here.
-            UserAcc;
-        _ ->
-            {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
-            Callback = fun fold_range_cb/2,
-            Acc = {skip, Skip, UserCallback, UserAcc},
-            {skip, _, UserCallback, OutAcc} =
-                    erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
-            OutAcc
+            Acc;
+        {_, undefined} ->
+            FAcc = get_fold_acc(RangePrefix, Callback, Acc, Options),
+            fold_range_tx(Tx, FAcc);
+        {_, true} ->
+            FAcc = get_fold_acc(RangePrefix, Callback, Acc, Options),
+            fold_range_iter(Tx, FAcc)
+    end.
+
+
+fold_range_tx(Tx, #fold_acc{} = FAcc) ->
+    #fold_acc{
+        start_key = Start,
+        end_key = End,
+        opts = Opts
+    } = FAcc,
+    Callback = fun fold_range_tx_cb/2,
+    FAccOut = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts),
+    FAccOut#fold_acc.uacc.
+
+
+fold_range_iter(Tx, #fold_acc{} = FAcc) ->
+    #fold_acc{
+        start_key = Start,
+        end_key = End,
+        opts = Opts
+    } = FAcc,
+    Callback = fun fold_range_iter_cb/2,
+    put(?PDICT_ITER_CHECKPOINT, FAcc),
+    try erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts) of
+        #fold_acc{uacc = UAccOut} -> UAccOut
+    catch
+        error:{erlfdb_error, ?TRANSACTION_TOO_OLD} ->
+            #fold_acc{} = FAcc1 = get(?PDICT_ITER_CHECKPOINT),
+            io:format(standard_error, "~n **** resetting Tx ~p FAcc1:~p~n", [Tx, FAcc1]),
+            couch_log:error("**** RESETTING Tx ~p FAcc1:~p~n", [Tx, FAcc1]),
+            ok = reset_iter_tx(Tx),
+            fold_range_iter(Tx, FAcc1)
     end.
 
 
@@ -1277,7 +1398,7 @@ chunkify_binary(Data) ->
     end.
 
 
-get_fold_opts(RangePrefix, Options) ->
+get_fold_acc(RangePrefix, UserCallback, UserAcc, Options) ->
     Reverse = case fabric2_util:get_value(dir, Options) of
         rev -> true;
         _ -> false
@@ -1367,15 +1488,75 @@ get_fold_opts(RangePrefix, Options) ->
             ++ StreamingMode
             ++ Snapshot,
 
-    {StartKey3, EndKey3, Skip, OutOpts}.
+    #fold_acc{
+        start_key = StartKey3,
+        end_key = EndKey3,
+        skip = Skip,
+        opts = OutOpts,
+        ucallback = UserCallback,
+        uacc = UserAcc
+    }.
+
+
+fold_range_tx_cb(KV, #fold_acc{skip = 0} = FAcc) ->
+    #fold_acc{ucallback = UCallback, uacc = UAcc} = FAcc,
+    NewUAcc = UCallback(KV, UAcc),
+    FAcc#fold_acc{uacc = NewUAcc};
+
+fold_range_tx_cb(_KV, #fold_acc{skip = N} = FAcc) when N > 0 ->
+    FAcc#fold_acc{skip = N - 1}.
+
+
+fold_range_iter_cb({K, V}, #fold_acc{skip = 0} = FAcc) ->
+    #fold_acc{ucallback = UCallback, uacc = UAcc} = FAcc,
+    NewUAcc = UCallback({K, V}, UAcc),
+    NewFAcc = next_iter_acc(K, FAcc),
+    put(?PDICT_ITER_CHECKPOINT, NewFAcc),
+    NewFAcc#fold_acc{uacc = NewUAcc};
+
 
+fold_range_iter_cb({K, _V}, #fold_acc{skip = N} = FAcc) when N > 0 ->
+    put(?PDICT_ITER_CHECKPOINT, FAcc),
+    next_iter_acc(K, FAcc).
 
-fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
-    NewAcc = Callback(KV, Acc),
-    {skip, 0, Callback, NewAcc};
 
-fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
-    {skip, N - 1, Callback, Acc}.
+next_iter_acc(K, #fold_acc{} = FAcc) ->
+    #fold_acc{skip = Skip, opts = Opts} = FAcc,
+    Opts1 = case fabric2_util:get_value(limit, Opts) of
+        N when is_integer(N), N > 0 ->
+            fabric2_util:replace_value(limit, Opts, N - 1);
+        undefined ->
+            Opts
+    end,
+    FAcc1 = FAcc#fold_acc{opts = Opts1, skip = max(0, Skip - 1)},
+    case fabric2_util:get_value(reverse, Opts, false) of
+        true -> FAcc1#fold_acc{end_key = K};
+        false -> FAcc1#fold_acc{start_key = K}
+    end.
+
+
+reset_iter_tx({erlfdb_transaction, _} = Tx) ->
+    ok = erlfdb:reset(Tx),
+    erlfdb:set_option(Tx, retry_limit, 0),
+    erlfdb:set_option(Tx, max_retry_delay, 0),
+    ok = iterator_db_validate(Tx, get(?PDICT_ITER_VALIDATE_DB)).
+
+
+iterator_db_validate({erlfdb_transaction, _}, undefined) ->
+    ok;
+
+iterator_db_validate({erlfdb_transaction, _} = Tx, #{} = Db) ->
+    #{
+        uuid := UUID,
+        name := DbName,
+        layer_prefix := LayerPrefix
+    } = Db,
+    DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
+    UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
+    case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of
+        UUID -> ok;
+        _ -> error(database_does_not_exist)
+    end.
 
 
 get_db_handle() ->
diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl
index 4e2e2d7..8d04219 100644
--- a/src/fabric/src/fabric2_util.erl
+++ b/src/fabric/src/fabric2_util.erl
@@ -31,6 +31,7 @@
 
     get_value/2,
     get_value/3,
+    replace_value/3,
     to_hex/1,
     from_hex/1,
     uuid/0
@@ -160,6 +161,10 @@ get_value(Key, List, Default) ->
     end.
 
 
+replace_value(Key, Val, List) ->
+    lists:keyreplace(Key, 1, List, {Key, Val}).
+
+
 to_hex(Bin) ->
     list_to_binary(to_hex_int(Bin)).