You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/02/11 17:56:22 UTC

[couchdb] branch multi-transactional-iterators-3 updated (7f2086e -> 3de2ac2)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch multi-transactional-iterators-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard 7f2086e  Use multi-transactional iterators
     new 3de2ac2  Use multi-transactional iterators

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (7f2086e)
            \
             N -- N -- N   refs/heads/multi-transactional-iterators-3 (3de2ac2)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/fabric/include/fabric2.hrl | 1 -
 1 file changed, 1 deletion(-)


[couchdb] 01/01: Use multi-transactional iterators

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch multi-transactional-iterators-3
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3de2ac2b04e9a28bedc9876f0ff7d02bebe1b291
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Jan 31 14:58:41 2020 -0500

    Use multi-transactional iterators
---
 src/fabric/include/fabric2.hrl    |   3 +
 src/fabric/src/fabric2_db.erl     |  12 ++-
 src/fabric/src/fabric2_fdb.erl    | 220 +++++++++++++++++++++++++++++++-------
 src/fabric/src/fabric2_server.erl |   1 +
 4 files changed, 195 insertions(+), 41 deletions(-)

diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 828a51b..0375da4 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -57,7 +57,10 @@
 -define(PDICT_TX_ID_KEY, '$fabric_tx_id').
 -define(PDICT_TX_RES_KEY, '$fabric_tx_result').
 -define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun').
+-define(PDICT_FOLD_ACC_STATE, '$fabric_fold_acc_state').
+
 -define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(TRANSACTION_TOO_OLD, 1007).
 
 
 -define(BINARY_CHUNK_SIZE, 100000).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 6d015df..5b763bc 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -175,7 +175,8 @@ open(DbName, Options) ->
     case fabric2_server:fetch(DbName) of
         #{} = Db ->
             Db1 = maybe_set_user_ctx(Db, Options),
-            {ok, require_member_check(Db1)};
+            Db2 = set_tx_options(Db1, Options),
+            {ok, require_member_check(Db2)};
         undefined ->
             Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) ->
                 fabric2_fdb:open(TxDb, Options)
@@ -214,7 +215,7 @@ list_dbs(Options) ->
     Callback = fun(DbName, Acc) -> [DbName | Acc] end,
     DbNames = fabric2_fdb:transactional(fun(Tx) ->
         fabric2_fdb:list_dbs(Tx, Callback, [], Options)
-    end),
+    end, Options),
     lists:reverse(DbNames).
 
 
@@ -235,7 +236,7 @@ list_dbs(UserFun, UserAcc0, Options) ->
         catch throw:{stop, FinalUserAcc} ->
             {ok, FinalUserAcc}
         end
-    end).
+    end, Options).
 
 
 is_admin(Db, {SecProps}) when is_list(SecProps) ->
@@ -1001,6 +1002,11 @@ maybe_set_user_ctx(Db, Options) ->
     end.
 
 
+set_tx_options(Db, Options) ->
+    TxOptions = fabric2_util:get_value(tx_options, Options, []),
+    Db#{tx_options := TxOptions}.
+
+
 is_member(Db, {SecProps}) when is_list(SecProps) ->
     case is_admin(Db, {SecProps}) of
         true ->
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 6abe1f6..2f1923b 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -69,18 +69,39 @@
 -include("fabric2.hrl").
 
 
-transactional(Fun) ->
-    do_transaction(Fun, undefined).
+-record(fold_acc, {
+    db,
+    restart_tx,
+    start_key,
+    end_key,
+    limit,
+    skip,
+    base_opts,
+    user_fun,
+    user_acc
+}).
+
+
+
+transactional(Fun) when is_function(Fun, 1) ->
+    transactional(Fun, []).
 
 
 transactional(DbName, Options, Fun) when is_binary(DbName) ->
     with_span(Fun, #{'db.name' => DbName}, fun() ->
         transactional(fun(Tx) ->
             Fun(init_db(Tx, DbName, Options))
-        end)
+        end, Options)
     end).
 
 
+transactional(DbName, Fun) when is_binary(DbName), is_function(Fun, 1) ->
+    transactional(DbName, [], Fun);
+
+transactional(Fun, Options) when is_function(Fun, 1), is_list(Options) ->
+    TxOptions = fabric2_util:get_value(tx_options, Options, []),
+    do_transaction(Fun, undefined, TxOptions);
+
 transactional(#{tx := undefined} = Db, Fun) ->
     DbName = maps:get(name, Db, undefined),
     try
@@ -91,13 +112,14 @@ transactional(#{tx := undefined} = Db, Fun) ->
             true -> undefined;
             false -> maps:get(layer_prefix, Db2)
         end,
+        Options = maps:get(tx_options, Db2, []),
         with_span(Fun, #{'db.name' => DbName}, fun() ->
             do_transaction(fun(Tx) ->
                 case Reopen of
                     true -> Fun(reopen(Db2#{tx => Tx}));
                     false -> Fun(Db2#{tx => Tx})
                 end
-            end, LayerPrefix)
+            end, LayerPrefix, Options)
         end)
     catch throw:{?MODULE, reopen} ->
         with_span('db.reopen', #{'db.name' => DbName}, fun() ->
@@ -112,19 +134,20 @@ transactional(#{tx := {erlfdb_transaction, _}} = Db, Fun) ->
     end).
 
 
-do_transaction(Fun, LayerPrefix) when is_function(Fun, 1) ->
+do_transaction(Fun, LayerPrefix, Options) when is_function(Fun, 1) ->
     Db = get_db_handle(),
     try
         erlfdb:transactional(Db, fun(Tx) ->
-            case get(erlfdb_trace) of
+            TraceOpts = case get(erlfdb_trace) of
                 Name when is_binary(Name) ->
                     UId = erlang:unique_integer([positive]),
                     UIdBin = integer_to_binary(UId, 36),
                     TxId = <<Name/binary, "_", UIdBin/binary>>,
-                    erlfdb:set_option(Tx, transaction_logging_enable, TxId);
+                    [{transaction_logging_enable, TxId}];
                 _ ->
-                    ok
+                    []
             end,
+            apply_tx_options(Tx, TraceOpts ++ Options),
             case is_transaction_applied(Tx) of
                 true ->
                     get_previous_transaction_result();
@@ -184,6 +207,9 @@ create(#{} = Db0, Options) ->
     UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
     Options1 = lists:keydelete(user_ctx, 1, Options),
 
+    TxOptions = fabric2_util:get_value(tx_options, Options1, []),
+    Options2 = lists:keydelete(tx_options, 1, Options1),
+
     Db#{
         uuid => UUID,
         db_prefix => DbPrefix,
@@ -198,7 +224,8 @@ create(#{} = Db0, Options) ->
         after_doc_read => undefined,
         % All other db things as we add features,
 
-        db_options => Options1
+        db_options => Options2,
+        tx_options => TxOptions
     }.
 
 
@@ -221,6 +248,9 @@ open(#{} = Db0, Options) ->
     UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
     Options1 = lists:keydelete(user_ctx, 1, Options),
 
+    TxOptions = fabric2_util:get_value(tx_options, Options1, []),
+    Options2 = lists:keydelete(tx_options, 1, Options1),
+
     Db2 = Db1#{
         db_prefix => DbPrefix,
         db_version => DbVersion,
@@ -237,7 +267,8 @@ open(#{} = Db0, Options) ->
         before_doc_update => undefined,
         after_doc_read => undefined,
 
-        db_options => Options1
+        db_options => Options2,
+        tx_options => TxOptions
     },
 
     Db3 = load_config(Db2),
@@ -815,25 +846,52 @@ get_last_change(#{} = Db) ->
     end.
 
 
-fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
+fold_range(#{} = Db, RangePrefix, UserFun, UserAcc, Options) ->
     #{
         tx := Tx
-    } = ensure_current(Db),
-    fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
-
-fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
-    case fabric2_util:get_value(limit, Options) of
-        0 ->
-            % FoundationDB treats a limit of 0 as unlimited
-            % so we have to guard for that here.
-            UserAcc;
-        _ ->
-            {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
-            Callback = fun fold_range_cb/2,
-            Acc = {skip, Skip, UserCallback, UserAcc},
-            {skip, _, UserCallback, OutAcc} =
-                    erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
-            OutAcc
+    } = CurrDb = ensure_current(Db),
+    % FoundationDB treats a limit 0 of as unlimited so we guard against it
+    case fabric2_util:get_value(limit, Options) of 0 -> UserAcc; _ ->
+        FAcc = get_fold_acc(CurrDb, RangePrefix, UserFun, UserAcc, Options),
+        try
+            fold_range(Tx, FAcc)
+        after
+            erase(?PDICT_FOLD_ACC_STATE)
+        end
+    end;
+
+fold_range({tx, Tx}, RangePrefix, UserFun, UserAcc, Options) ->
+    % FoundationDB treats a limit 0 of as unlimited so we guard against it
+    case fabric2_util:get_value(limit, Options) of 0 -> UserAcc; _ ->
+        FAcc = get_fold_acc(undefined, RangePrefix, UserFun, UserAcc, Options),
+        try
+            fold_range(Tx, FAcc)
+        after
+            erase(?PDICT_FOLD_ACC_STATE)
+        end
+    end.
+
+
+fold_range(Tx, FAcc) ->
+    #fold_acc{
+        start_key = Start,
+        end_key = End,
+        limit = Limit,
+        base_opts = BaseOpts,
+        restart_tx = DoRestart
+    } = FAcc,
+    case DoRestart of false -> ok; true ->
+        ok = erlfdb:set_option(Tx, disallow_writes)
+    end,
+    Opts = [{limit, Limit} | BaseOpts],
+    Callback = fun fold_range_cb/2,
+    try
+        #fold_acc{
+            user_acc = FinalUserAcc
+        } = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts),
+        FinalUserAcc
+    catch error:{erlfdb_error, ?TRANSACTION_TOO_OLD} when DoRestart ->
+        fold_range(Tx, restart_fold(Tx, FAcc))
     end.
 
 
@@ -1277,7 +1335,14 @@ chunkify_binary(Data) ->
     end.
 
 
-get_fold_opts(RangePrefix, Options) ->
+get_fold_acc(Db, RangePrefix, UserCallback, UserAcc, Options)
+        when is_map(Db) orelse Db =:= undefined ->
+
+    BaseAcc = case fabric2_util:get_value(restart_tx, Options) of
+        true -> #fold_acc{db = Db, restart_tx = true};
+        undefined -> #fold_acc{db = undefined, restart_tx = false}
+    end,
+
     Reverse = case fabric2_util:get_value(dir, Options) of
         rev -> true;
         _ -> false
@@ -1342,8 +1407,8 @@ get_fold_opts(RangePrefix, Options) ->
     end,
 
     Limit = case fabric2_util:get_value(limit, Options) of
-        L when is_integer(L), L >= 0 -> [{limit, L + Skip}];
-        undefined -> []
+        L when is_integer(L), L >= 0 -> L + Skip;
+        undefined -> 0
     end,
 
     TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
@@ -1361,21 +1426,66 @@ get_fold_opts(RangePrefix, Options) ->
         B when is_boolean(B) -> [{snapshot, B}]
     end,
 
-    OutOpts = [{reverse, Reverse}]
-            ++ Limit
+    BaseOpts = [{reverse, Reverse}]
             ++ TargetBytes
             ++ StreamingMode
             ++ Snapshot,
 
-    {StartKey3, EndKey3, Skip, OutOpts}.
+    BaseAcc#fold_acc{
+        start_key = StartKey3,
+        end_key = EndKey3,
+        skip = Skip,
+        limit = Limit,
+        base_opts = BaseOpts,
+        user_fun = UserCallback,
+        user_acc = UserAcc
+    }.
 
 
-fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
-    NewAcc = Callback(KV, Acc),
-    {skip, 0, Callback, NewAcc};
+fold_range_cb({K, V}, #fold_acc{} = Acc) ->
+    #fold_acc{
+        skip = Skip,
+        limit = Limit,
+        user_fun = UserFun,
+        user_acc = UserAcc,
+        base_opts = Opts
+    } = Acc,
+    Acc1 = case Skip =:= 0 of
+        true ->
+            UserAcc1 = UserFun({K, V}, UserAcc),
+            Acc#fold_acc{limit = Limit - 1, user_acc = UserAcc1};
+        false ->
+            Acc#fold_acc{skip = Skip - 1, limit = Limit - 1}
+    end,
+    Acc2 = case fabric2_util:get_value(reverse, Opts, false) of
+        true -> Acc1#fold_acc{end_key = K};
+        false ->Acc1#fold_acc{start_key = K}
+    end,
+    put(?PDICT_FOLD_ACC_STATE, Acc2),
+    Acc2.
 
-fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
-    {skip, N - 1, Callback, Acc}.
+
+restart_fold(Tx, #fold_acc{} = Acc) ->
+    erase(?PDICT_CHECKED_MD_IS_CURRENT),
+    ok = erlfdb:reset(Tx),
+    case get(?PDICT_FOLD_ACC_STATE) of
+        #fold_acc{db = Db0} = Acc1 ->
+            erase(?PDICT_FOLD_ACC_STATE),
+            Db = check_db_instance(Db0),
+            Acc1#fold_acc{db = Db};
+        undefined ->
+            #fold_acc{
+                db = Db,
+                start_key = Start,
+                end_key = End,
+                limit = Limit,
+                user_fun = UserFun
+            } = Acc,
+            Db = check_db_instance(Acc#fold_acc.db),
+            Args = [Db, Start, End, Limit, UserFun],
+            couch_log:error("fold_range not progressing ~p ~p:~p ~p ~p", Args),
+            Acc#fold_acc{db = Db}
+    end.
 
 
 get_db_handle() ->
@@ -1415,6 +1525,30 @@ ensure_current(#{} = Db0, CheckDbVersion) ->
     end.
 
 
+check_db_instance(undefined) ->
+    undefined;
+
+check_db_instance(#{} = Db) ->
+    require_transaction(Db),
+    case check_metadata_version(Db) of
+        {current, Db1} ->
+            Db1;
+        {stale, Db1} ->
+            #{
+                tx := Tx,
+                uuid := UUID,
+                name := DbName,
+                layer_prefix := LayerPrefix
+            } = Db1,
+            DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
+            UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
+            case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of
+                UUID -> Db1;
+                _ -> error(database_does_not_exist)
+            end
+    end.
+
+
 is_transaction_applied(Tx) ->
     is_commit_unknown_result()
         andalso has_transaction_id()
@@ -1512,3 +1646,13 @@ with_span(Operation, ExtraTags, Fun) ->
         false ->
             Fun()
     end.
+
+
+apply_tx_options(Tx, Options) ->
+    lists:foreach(fun(Option) ->
+        case Option of
+            K when is_atom(K) -> erlfdb:set_option(Tx, K);
+            {K, V} -> erlfdb:set_option(Tx, K, V)
+        end
+    end, Options),
+    Tx.
diff --git a/src/fabric/src/fabric2_server.erl b/src/fabric/src/fabric2_server.erl
index b1c38ef..acacb0c 100644
--- a/src/fabric/src/fabric2_server.erl
+++ b/src/fabric/src/fabric2_server.erl
@@ -58,6 +58,7 @@ fetch(DbName) when is_binary(DbName) ->
 store(#{name := DbName} = Db0) when is_binary(DbName) ->
     Db1 = Db0#{
         tx := undefined,
+        tx_options := [],
         user_ctx := #user_ctx{},
         security_fun := undefined
     },