You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/02/07 21:06:53 UTC
[couchdb] 01/01: Use multi-transactional iterators
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch multi-transactional-iterators-2
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 194a8bc05a69a0501f9cf3a2c16da1a153168232
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Jan 31 14:58:41 2020 -0500
Use multi-transactional iterators
---
src/fabric/include/fabric2.hrl | 5 +-
src/fabric/src/fabric2_db.erl | 24 ++++-
src/fabric/src/fabric2_fdb.erl | 217 ++++++++++++++++++++++++++++++++++++----
src/fabric/src/fabric2_util.erl | 5 +
4 files changed, 228 insertions(+), 23 deletions(-)
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 828a51b..0be27fb 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -57,7 +57,10 @@
-define(PDICT_TX_ID_KEY, '$fabric_tx_id').
-define(PDICT_TX_RES_KEY, '$fabric_tx_result').
-define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun').
--define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(PDICT_ITER_CHECKPOINT, '$fabric_iter_checkpoint').
+-define(PDICT_ITER_VALIDATE_DB, '$fabric_iter_validate_db').
+-define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(TRANSACTION_TOO_OLD, 1007).
-define(BINARY_CHUNK_SIZE, 100000).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 6d015df..7b9e726 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -211,18 +211,26 @@ list_dbs() ->
list_dbs(Options) ->
+ TxFun = case fabric2_util:get_value(iterator, Options) of
+ true -> with_iter;
+ undefined -> transactional
+ end,
Callback = fun(DbName, Acc) -> [DbName | Acc] end,
- DbNames = fabric2_fdb:transactional(fun(Tx) ->
+ DbNames = fabric2_fdb:TxFun(fun(Tx) ->
fabric2_fdb:list_dbs(Tx, Callback, [], Options)
end),
lists:reverse(DbNames).
list_dbs(UserFun, UserAcc0, Options) ->
+ TxFun = case fabric2_util:get_value(iterator, Options) of
+ true -> with_iter;
+ undefined -> transactional
+ end,
FoldFun = fun
(DbName, Acc) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc))
end,
- fabric2_fdb:transactional(fun(Tx) ->
+ fabric2_fdb:TxFun(fun(Tx) ->
try
UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
UserAcc2 = fabric2_fdb:list_dbs(
@@ -755,7 +763,11 @@ fold_docs(Db, UserFun, UserAcc) ->
fold_docs(Db, UserFun, UserAcc0, Options) ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
+ TxFun = case fabric2_util:get_value(iterator, Options) of
+ true -> with_iter;
+ undefined -> transactional
+ end,
+ fabric2_fdb:TxFun(Db, fun(TxDb) ->
try
#{
db_prefix := DbPrefix
@@ -829,7 +841,11 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
+ TxFun = case fabric2_util:get_value(iterator, Options) of
+ true -> with_iter;
+ undefined -> transactional
+ end,
+ fabric2_fdb:TxFun(Db, fun(TxDb) ->
try
#{
db_prefix := DbPrefix
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 6abe1f6..4c65b8e 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -24,6 +24,12 @@
delete/1,
exists/1,
+ create_iter/0,
+ create_iter/1,
+ destroy_iter/1,
+ with_iter/1,
+ with_iter/2,
+
get_dir/1,
list_dbs/4,
@@ -69,6 +75,17 @@
-include("fabric2.hrl").
+-record(fold_acc, {
+ start_key,
+ end_key,
+ skip,
+ opts,
+ ucallback,
+ uacc
+}).
+
+
+
transactional(Fun) ->
do_transaction(Fun, undefined).
@@ -307,6 +324,79 @@ exists(#{name := DbName} = Db) when is_binary(DbName) ->
end.
+create_iter() ->
+ case get(?PDICT_ITER_CHECKPOINT) of
+ undefined -> ok;
+ _ -> error(iterator_already_created)
+ end,
+ Fdb = get_db_handle(),
+ Tx = erlfdb:create_transaction(Fdb),
+ erlfdb:set_option(Tx, disallow_writes),
+ erlfdb:set_option(Tx, retry_limit, 0),
+ erlfdb:set_option(Tx, max_retry_delay, 0),
+ Tx.
+
+
+create_iter(#{tx := undefined} = Db) ->
+ try
+
+ Db1 = refresh(Db),
+
+ Reopen = maps:get(reopen, Db1, false),
+ Db2 = maps:remove(reopen, Db1),
+
+ Tx = create_iter(),
+
+ Db3 = case Reopen of
+ true -> reopen(Db2#{tx => Tx});
+ false -> Db2#{tx => Tx}
+ end,
+
+ % Here we might throw `reopen`
+ Db4 = ensure_current(Db3),
+
+ % This part might update the Db cache
+ ok = run_on_commit_fun(Tx),
+ erase({?PDICT_ON_COMMIT_FUN, Tx}),
+
+ % Save the initial Db handle so we can validate
+ % that the same db was still running
+ put(?PDICT_ITER_VALIDATE_DB, Db4),
+
+ Db4#{tx := Tx}
+ catch throw:{?MODULE, reopen} ->
+ create_iter(Db#{reopen => true})
+ end.
+
+
+destroy_iter({erlfdb_transaction, _}) ->
+ erase(?PDICT_ITER_CHECKPOINT);
+
+destroy_iter(#{tx := {erfdb_transaction, _}} = Db) ->
+ #{tx := Tx} = Db,
+ erase(?PDICT_ITER_VALIDATE_DB),
+ destroy_iter(Tx),
+ Db#{tx := undefined}.
+
+
+with_iter(Fun) when is_function(Fun, 1) ->
+ Tx = create_iter(),
+ try
+ Fun(Tx)
+ after
+ destroy_iter(Tx)
+ end.
+
+
+with_iter(#{tx := undefined} = Db, Fun) when is_function(Fun, 1) ->
+ IterDb = create_iter(Db),
+ try
+ Fun(IterDb)
+ after
+ destroy_iter(IterDb)
+ end.
+
+
get_dir(Tx) ->
Root = erlfdb_directory:root(),
Dir = fabric2_server:fdb_directory(),
@@ -821,19 +911,50 @@ fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
} = ensure_current(Db),
fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
-fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
- case fabric2_util:get_value(limit, Options) of
- 0 ->
+fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options) ->
+ Iterator = fabric2_util:get_value(iterator, Options),
+ case {fabric2_util:get_value(limit, Options), Iterator} of
+ {0, _} ->
% FoundationDB treats a limit of 0 as unlimited
% so we have to guard for that here.
- UserAcc;
- _ ->
- {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
- Callback = fun fold_range_cb/2,
- Acc = {skip, Skip, UserCallback, UserAcc},
- {skip, _, UserCallback, OutAcc} =
- erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
- OutAcc
+ Acc;
+ {_, undefined} ->
+ FAcc = get_fold_acc(RangePrefix, Callback, Acc, Options),
+ fold_range_tx(Tx, FAcc);
+ {_, true} ->
+ FAcc = get_fold_acc(RangePrefix, Callback, Acc, Options),
+ fold_range_iter(Tx, FAcc)
+ end.
+
+
+fold_range_tx(Tx, #fold_acc{} = FAcc) ->
+ #fold_acc{
+ start_key = Start,
+ end_key = End,
+ opts = Opts
+ } = FAcc,
+ Callback = fun fold_range_tx_cb/2,
+ FAccOut = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts),
+ FAccOut#fold_acc.uacc.
+
+
+fold_range_iter(Tx, #fold_acc{} = FAcc) ->
+ #fold_acc{
+ start_key = Start,
+ end_key = End,
+ opts = Opts
+ } = FAcc,
+ Callback = fun fold_range_iter_cb/2,
+ put(?PDICT_ITER_CHECKPOINT, FAcc),
+ try erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts) of
+ #fold_acc{uacc = UAccOut} -> UAccOut
+ catch
+ error:{erlfdb_error, ?TRANSACTION_TOO_OLD} ->
+ #fold_acc{} = FAcc1 = get(?PDICT_ITER_CHECKPOINT),
+ io:format(standard_error, "~n **** resetting Tx ~p FAcc1:~p~n", [Tx, FAcc1]),
+ couch_log:error("**** RESETTING Tx ~p FAcc1:~p~n", [Tx, FAcc1]),
+ ok = reset_iter_tx(Tx),
+ fold_range_iter(Tx, FAcc1)
end.
@@ -1277,7 +1398,7 @@ chunkify_binary(Data) ->
end.
-get_fold_opts(RangePrefix, Options) ->
+get_fold_acc(RangePrefix, UserCallback, UserAcc, Options) ->
Reverse = case fabric2_util:get_value(dir, Options) of
rev -> true;
_ -> false
@@ -1367,15 +1488,75 @@ get_fold_opts(RangePrefix, Options) ->
++ StreamingMode
++ Snapshot,
- {StartKey3, EndKey3, Skip, OutOpts}.
+ #fold_acc{
+ start_key = StartKey3,
+ end_key = EndKey3,
+ skip = Skip,
+ opts = OutOpts,
+ ucallback = UserCallback,
+ uacc = UserAcc
+ }.
+
+
+fold_range_tx_cb(KV, #fold_acc{skip = 0} = FAcc) ->
+ #fold_acc{ucallback = UCallback, uacc = UAcc} = FAcc,
+ NewUAcc = UCallback(KV, UAcc),
+ FAcc#fold_acc{uacc = NewUAcc};
+
+fold_range_tx_cb(_KV, #fold_acc{skip = N} = FAcc) when N > 0 ->
+ FAcc#fold_acc{skip = N - 1}.
+
+
+fold_range_iter_cb({K, V}, #fold_acc{skip = 0} = FAcc) ->
+ #fold_acc{ucallback = UCallback, uacc = UAcc} = FAcc,
+ NewUAcc = UCallback({K, V}, UAcc),
+ NewFAcc = next_iter_acc(K, FAcc),
+ put(?PDICT_ITER_CHECKPOINT, NewFAcc),
+ NewFAcc#fold_acc{uacc = NewUAcc};
+
+fold_range_iter_cb({K, _V}, #fold_acc{skip = N} = FAcc) when N > 0 ->
+ put(?PDICT_ITER_CHECKPOINT, FAcc),
+ next_iter_acc(K, FAcc).
-fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
- NewAcc = Callback(KV, Acc),
- {skip, 0, Callback, NewAcc};
-fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
- {skip, N - 1, Callback, Acc}.
+next_iter_acc(K, #fold_acc{} = FAcc) ->
+ #fold_acc{skip = Skip, opts = Opts} = FAcc,
+ Opts1 = case fabric2_util:get_value(limit, Opts) of
+ N when is_integer(N), N > 0 ->
+ fabric2_util:replace_value(limit, Opts, N - 1);
+ undefined ->
+ Opts
+ end,
+ FAcc1 = FAcc#fold_acc{opts = Opts1, skip = max(0, Skip - 1)},
+ case fabric2_util:get_value(reverse, Opts, false) of
+ true -> FAcc1#fold_acc{end_key = K};
+ false -> FAcc1#fold_acc{start_key = K}
+ end.
+
+
+reset_iter_tx({erlfdb_transaction, _} = Tx) ->
+ ok = erlfdb:reset(Tx),
+ erlfdb:set_option(Tx, retry_limit, 0),
+ erlfdb:set_option(Tx, max_retry_delay, 0),
+ ok = iterator_db_validate(Tx, get(?PDICT_ITER_VALIDATE_DB)).
+
+
+iterator_db_validate({erlfdb_transaction, _}, undefined) ->
+ ok;
+
+iterator_db_validate({erlfdb_transaction, _} = Tx, #{} = Db) ->
+ #{
+ uuid := UUID,
+ name := DbName,
+ layer_prefix := LayerPrefix
+ } = Db,
+ DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
+ UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of
+ UUID -> ok;
+ _ -> error(database_does_not_exist)
+ end.
get_db_handle() ->
diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl
index 4e2e2d7..8d04219 100644
--- a/src/fabric/src/fabric2_util.erl
+++ b/src/fabric/src/fabric2_util.erl
@@ -31,6 +31,7 @@
get_value/2,
get_value/3,
+ replace_value/3,
to_hex/1,
from_hex/1,
uuid/0
@@ -160,6 +161,10 @@ get_value(Key, List, Default) ->
end.
+replace_value(Key, Val, List) ->
+ lists:keyreplace(Key, 1, List, {Key, Val}).
+
+
to_hex(Bin) ->
list_to_binary(to_hex_int(Bin)).