You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/02/11 17:53:25 UTC

[couchdb] branch multi-transactional-iterators-3 created (now 7f2086e)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch multi-transactional-iterators-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at 7f2086e  Use multi-transactional iterators

This branch includes the following new commits:

     new 7f2086e  Use multi-transactional iterators

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: Use multi-transactional iterators

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch multi-transactional-iterators-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 7f2086e66878f8efd9707707d793b63144ccbfab
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Jan 31 14:58:41 2020 -0500

    Use multi-transactional iterators
---
 src/fabric/include/fabric2.hrl    |   4 +
 src/fabric/src/fabric2_db.erl     |  12 ++-
 src/fabric/src/fabric2_fdb.erl    | 220 +++++++++++++++++++++++++++++++-------
 src/fabric/src/fabric2_server.erl |   1 +
 4 files changed, 196 insertions(+), 41 deletions(-)

diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 828a51b..61bf458 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -57,7 +57,11 @@
 -define(PDICT_TX_ID_KEY, '$fabric_tx_id').
 -define(PDICT_TX_RES_KEY, '$fabric_tx_result').
 -define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun').
+-define(PDICT_FOLD_ACC_STATE, '$fabric_fold_acc_state').
+-define(PDICT_ITER_VALIDATE_DB, '$fabric_iter_validate_db').
+
 -define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(TRANSACTION_TOO_OLD, 1007).
 
 
 -define(BINARY_CHUNK_SIZE, 100000).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 6d015df..5b763bc 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -175,7 +175,8 @@ open(DbName, Options) ->
     case fabric2_server:fetch(DbName) of
         #{} = Db ->
             Db1 = maybe_set_user_ctx(Db, Options),
-            {ok, require_member_check(Db1)};
+            Db2 = set_tx_options(Db1, Options),
+            {ok, require_member_check(Db2)};
         undefined ->
             Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) ->
                 fabric2_fdb:open(TxDb, Options)
@@ -214,7 +215,7 @@ list_dbs(Options) ->
     Callback = fun(DbName, Acc) -> [DbName | Acc] end,
     DbNames = fabric2_fdb:transactional(fun(Tx) ->
         fabric2_fdb:list_dbs(Tx, Callback, [], Options)
-    end),
+    end, Options),
     lists:reverse(DbNames).
 
 
@@ -235,7 +236,7 @@ list_dbs(UserFun, UserAcc0, Options) ->
         catch throw:{stop, FinalUserAcc} ->
             {ok, FinalUserAcc}
         end
-    end).
+    end, Options).
 
 
 is_admin(Db, {SecProps}) when is_list(SecProps) ->
@@ -1001,6 +1002,11 @@ maybe_set_user_ctx(Db, Options) ->
     end.
 
 
+set_tx_options(Db, Options) ->
+    TxOptions = fabric2_util:get_value(tx_options, Options, []),
+    Db#{tx_options := TxOptions}.
+
+
 is_member(Db, {SecProps}) when is_list(SecProps) ->
     case is_admin(Db, {SecProps}) of
         true ->
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 6abe1f6..2f1923b 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -69,18 +69,39 @@
 -include("fabric2.hrl").
 
 
-transactional(Fun) ->
-    do_transaction(Fun, undefined).
+-record(fold_acc, {
+    db,
+    restart_tx,
+    start_key,
+    end_key,
+    limit,
+    skip,
+    base_opts,
+    user_fun,
+    user_acc
+}).
+
+
+
+transactional(Fun) when is_function(Fun, 1) ->
+    transactional(Fun, []).
 
 
 transactional(DbName, Options, Fun) when is_binary(DbName) ->
     with_span(Fun, #{'db.name' => DbName}, fun() ->
         transactional(fun(Tx) ->
             Fun(init_db(Tx, DbName, Options))
-        end)
+        end, Options)
     end).
 
 
+transactional(DbName, Fun) when is_binary(DbName), is_function(Fun, 1) ->
+    transactional(DbName, [], Fun);
+
+transactional(Fun, Options) when is_function(Fun, 1), is_list(Options) ->
+    TxOptions = fabric2_util:get_value(tx_options, Options, []),
+    do_transaction(Fun, undefined, TxOptions);
+
 transactional(#{tx := undefined} = Db, Fun) ->
     DbName = maps:get(name, Db, undefined),
     try
@@ -91,13 +112,14 @@ transactional(#{tx := undefined} = Db, Fun) ->
             true -> undefined;
             false -> maps:get(layer_prefix, Db2)
         end,
+        Options = maps:get(tx_options, Db2, []),
         with_span(Fun, #{'db.name' => DbName}, fun() ->
             do_transaction(fun(Tx) ->
                 case Reopen of
                     true -> Fun(reopen(Db2#{tx => Tx}));
                     false -> Fun(Db2#{tx => Tx})
                 end
-            end, LayerPrefix)
+            end, LayerPrefix, Options)
         end)
     catch throw:{?MODULE, reopen} ->
         with_span('db.reopen', #{'db.name' => DbName}, fun() ->
@@ -112,19 +134,20 @@ transactional(#{tx := {erlfdb_transaction, _}} = Db, Fun) ->
     end).
 
 
-do_transaction(Fun, LayerPrefix) when is_function(Fun, 1) ->
+do_transaction(Fun, LayerPrefix, Options) when is_function(Fun, 1) ->
     Db = get_db_handle(),
     try
         erlfdb:transactional(Db, fun(Tx) ->
-            case get(erlfdb_trace) of
+            TraceOpts = case get(erlfdb_trace) of
                 Name when is_binary(Name) ->
                     UId = erlang:unique_integer([positive]),
                     UIdBin = integer_to_binary(UId, 36),
                     TxId = <<Name/binary, "_", UIdBin/binary>>,
-                    erlfdb:set_option(Tx, transaction_logging_enable, TxId);
+                    [{transaction_logging_enable, TxId}];
                 _ ->
-                    ok
+                    []
             end,
+            apply_tx_options(Tx, TraceOpts ++ Options),
             case is_transaction_applied(Tx) of
                 true ->
                     get_previous_transaction_result();
@@ -184,6 +207,9 @@ create(#{} = Db0, Options) ->
     UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
     Options1 = lists:keydelete(user_ctx, 1, Options),
 
+    TxOptions = fabric2_util:get_value(tx_options, Options1, []),
+    Options2 = lists:keydelete(tx_options, 1, Options1),
+
     Db#{
         uuid => UUID,
         db_prefix => DbPrefix,
@@ -198,7 +224,8 @@ create(#{} = Db0, Options) ->
         after_doc_read => undefined,
         % All other db things as we add features,
 
-        db_options => Options1
+        db_options => Options2,
+        tx_options => TxOptions
     }.
 
 
@@ -221,6 +248,9 @@ open(#{} = Db0, Options) ->
     UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
     Options1 = lists:keydelete(user_ctx, 1, Options),
 
+    TxOptions = fabric2_util:get_value(tx_options, Options1, []),
+    Options2 = lists:keydelete(tx_options, 1, Options1),
+
     Db2 = Db1#{
         db_prefix => DbPrefix,
         db_version => DbVersion,
@@ -237,7 +267,8 @@ open(#{} = Db0, Options) ->
         before_doc_update => undefined,
         after_doc_read => undefined,
 
-        db_options => Options1
+        db_options => Options2,
+        tx_options => TxOptions
     },
 
     Db3 = load_config(Db2),
@@ -815,25 +846,52 @@ get_last_change(#{} = Db) ->
     end.
 
 
-fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
+fold_range(#{} = Db, RangePrefix, UserFun, UserAcc, Options) ->
     #{
         tx := Tx
-    } = ensure_current(Db),
-    fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
-
-fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
-    case fabric2_util:get_value(limit, Options) of
-        0 ->
-            % FoundationDB treats a limit of 0 as unlimited
-            % so we have to guard for that here.
-            UserAcc;
-        _ ->
-            {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
-            Callback = fun fold_range_cb/2,
-            Acc = {skip, Skip, UserCallback, UserAcc},
-            {skip, _, UserCallback, OutAcc} =
-                    erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
-            OutAcc
+    } = CurrDb = ensure_current(Db),
+    % FoundationDB treats a limit 0 of as unlimited so we guard against it
+    case fabric2_util:get_value(limit, Options) of 0 -> UserAcc; _ ->
+        FAcc = get_fold_acc(CurrDb, RangePrefix, UserFun, UserAcc, Options),
+        try
+            fold_range(Tx, FAcc)
+        after
+            erase(?PDICT_FOLD_ACC_STATE)
+        end
+    end;
+
+fold_range({tx, Tx}, RangePrefix, UserFun, UserAcc, Options) ->
+    % FoundationDB treats a limit 0 of as unlimited so we guard against it
+    case fabric2_util:get_value(limit, Options) of 0 -> UserAcc; _ ->
+        FAcc = get_fold_acc(undefined, RangePrefix, UserFun, UserAcc, Options),
+        try
+            fold_range(Tx, FAcc)
+        after
+            erase(?PDICT_FOLD_ACC_STATE)
+        end
+    end.
+
+
+fold_range(Tx, FAcc) ->
+    #fold_acc{
+        start_key = Start,
+        end_key = End,
+        limit = Limit,
+        base_opts = BaseOpts,
+        restart_tx = DoRestart
+    } = FAcc,
+    case DoRestart of false -> ok; true ->
+        ok = erlfdb:set_option(Tx, disallow_writes)
+    end,
+    Opts = [{limit, Limit} | BaseOpts],
+    Callback = fun fold_range_cb/2,
+    try
+        #fold_acc{
+            user_acc = FinalUserAcc
+        } = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts),
+        FinalUserAcc
+    catch error:{erlfdb_error, ?TRANSACTION_TOO_OLD} when DoRestart ->
+        fold_range(Tx, restart_fold(Tx, FAcc))
     end.
 
 
@@ -1277,7 +1335,14 @@ chunkify_binary(Data) ->
     end.
 
 
-get_fold_opts(RangePrefix, Options) ->
+get_fold_acc(Db, RangePrefix, UserCallback, UserAcc, Options)
+        when is_map(Db) orelse Db =:= undefined ->
+
+    BaseAcc = case fabric2_util:get_value(restart_tx, Options) of
+        true -> #fold_acc{db = Db, restart_tx = true};
+        undefined -> #fold_acc{db = undefined, restart_tx = false}
+    end,
+
     Reverse = case fabric2_util:get_value(dir, Options) of
         rev -> true;
         _ -> false
@@ -1342,8 +1407,8 @@ get_fold_opts(RangePrefix, Options) ->
     end,
 
     Limit = case fabric2_util:get_value(limit, Options) of
-        L when is_integer(L), L >= 0 -> [{limit, L + Skip}];
-        undefined -> []
+        L when is_integer(L), L >= 0 -> L + Skip;
+        undefined -> 0
     end,
 
     TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
@@ -1361,21 +1426,66 @@ get_fold_opts(RangePrefix, Options) ->
         B when is_boolean(B) -> [{snapshot, B}]
     end,
 
-    OutOpts = [{reverse, Reverse}]
-            ++ Limit
+    BaseOpts = [{reverse, Reverse}]
             ++ TargetBytes
             ++ StreamingMode
             ++ Snapshot,
 
-    {StartKey3, EndKey3, Skip, OutOpts}.
+    BaseAcc#fold_acc{
+        start_key = StartKey3,
+        end_key = EndKey3,
+        skip = Skip,
+        limit = Limit,
+        base_opts = BaseOpts,
+        user_fun = UserCallback,
+        user_acc = UserAcc
+    }.
 
 
-fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
-    NewAcc = Callback(KV, Acc),
-    {skip, 0, Callback, NewAcc};
+fold_range_cb({K, V}, #fold_acc{} = Acc) ->
+    #fold_acc{
+        skip = Skip,
+        limit = Limit,
+        user_fun = UserFun,
+        user_acc = UserAcc,
+        base_opts = Opts
+    } = Acc,
+    Acc1 = case Skip =:= 0 of
+        true ->
+            UserAcc1 = UserFun({K, V}, UserAcc),
+            Acc#fold_acc{limit = Limit - 1, user_acc = UserAcc1};
+        false ->
+            Acc#fold_acc{skip = Skip - 1, limit = Limit - 1}
+    end,
+    Acc2 = case fabric2_util:get_value(reverse, Opts, false) of
+        true -> Acc1#fold_acc{end_key = K};
+        false ->Acc1#fold_acc{start_key = K}
+    end,
+    put(?PDICT_FOLD_ACC_STATE, Acc2),
+    Acc2.
 
-fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
-    {skip, N - 1, Callback, Acc}.
+
+restart_fold(Tx, #fold_acc{} = Acc) ->
+    erase(?PDICT_CHECKED_MD_IS_CURRENT),
+    ok = erlfdb:reset(Tx),
+    case get(?PDICT_FOLD_ACC_STATE) of
+        #fold_acc{db = Db0} = Acc1 ->
+            erase(?PDICT_FOLD_ACC_STATE),
+            Db = check_db_instance(Db0),
+            Acc1#fold_acc{db = Db};
+        undefined ->
+            #fold_acc{
+                db = Db,
+                start_key = Start,
+                end_key = End,
+                limit = Limit,
+                user_fun = UserFun
+            } = Acc,
+            Db = check_db_instance(Acc#fold_acc.db),
+            Args = [Db, Start, End, Limit, UserFun],
+            couch_log:error("fold_range not progressing ~p ~p:~p ~p ~p", Args),
+            Acc#fold_acc{db = Db}
+    end.
 
 
 get_db_handle() ->
@@ -1415,6 +1525,30 @@ ensure_current(#{} = Db0, CheckDbVersion) ->
     end.
 
 
+check_db_instance(undefined) ->
+    undefined;
+
+check_db_instance(#{} = Db) ->
+    require_transaction(Db),
+    case check_metadata_version(Db) of
+        {current, Db1} ->
+            Db1;
+        {stale, Db1} ->
+            #{
+                tx := Tx,
+                uuid := UUID,
+                name := DbName,
+                layer_prefix := LayerPrefix
+            } = Db1,
+            DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
+            UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
+            case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of
+                UUID -> Db1;
+                _ -> error(database_does_not_exist)
+            end
+    end.
+
+
 is_transaction_applied(Tx) ->
     is_commit_unknown_result()
         andalso has_transaction_id()
@@ -1512,3 +1646,13 @@ with_span(Operation, ExtraTags, Fun) ->
         false ->
             Fun()
     end.
+
+
+apply_tx_options(Tx, Options) ->
+    lists:foreach(fun(Option) ->
+        case Option of
+            K when is_atom(K) -> erlfdb:set_option(Tx, K);
+            {K, V} -> erlfdb:set_option(Tx, K, V)
+        end
+    end, Options),
+    Tx.
diff --git a/src/fabric/src/fabric2_server.erl b/src/fabric/src/fabric2_server.erl
index b1c38ef..acacb0c 100644
--- a/src/fabric/src/fabric2_server.erl
+++ b/src/fabric/src/fabric2_server.erl
@@ -58,6 +58,7 @@ fetch(DbName) when is_binary(DbName) ->
 store(#{name := DbName} = Db0) when is_binary(DbName) ->
     Db1 = Db0#{
         tx := undefined,
+        tx_options := [],
         user_ctx := #user_ctx{},
         security_fun := undefined
     },