You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/02/14 20:31:23 UTC

[couchdb] branch prototype/fdb-layer updated: Implement mult-transactional iterators for _changes feeds

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/prototype/fdb-layer by this push:
     new 148af9e  Implement mult-transactional iterators for _changes feeds
148af9e is described below

commit 148af9efadfe97382523c0259171278849186fe9
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Jan 31 14:58:41 2020 -0500

    Implement mult-transactional iterators for _changes feeds
    
    Previously changes feeds would fail if they streamed data for more than five
    seconds. This was because of the FoundationDB's transaction time limit. After
    the timeout fired, an 1007 (transaction_too_long) error was raised, and
    transaction was retried. The emitted changes feed would often crash or simple
    hang because the HTTP state would be garbled as response data was re-sent over
    the same socket stream again.
    
    To fix the issue introduce a new `{restart_tx, true}` option for
    `fold_range/4`. This option sets up a new transaction to continue iterating
    over the range from where the last one left off.
    
    To avoid data being resent in the response stream, user callback functions must
    first read all the data they plan on sending during that callback, send it out,
    and then after that it must not do any more db reads so as not to trigger a
    `transaction_too_old` error.
---
 src/fabric/include/fabric2.hrl                 |   4 +
 src/fabric/src/fabric2_db.erl                  |   7 +-
 src/fabric/src/fabric2_fdb.erl                 | 174 +++++++++++++++---
 src/fabric/test/fabric2_changes_fold_tests.erl | 241 +++++++++++++++++++++----
 src/fabric/test/fabric2_test.hrl               |   8 +
 5 files changed, 364 insertions(+), 70 deletions(-)

diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 828a51b..d07a737 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -57,6 +57,10 @@
 -define(PDICT_TX_ID_KEY, '$fabric_tx_id').
 -define(PDICT_TX_RES_KEY, '$fabric_tx_result').
 -define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun').
+-define(PDICT_FOLD_ACC_STATE, '$fabric_fold_acc_state').
+
+% Let's keep these in ascending order
+-define(TRANSACTION_TOO_OLD, 1007).
 -define(COMMIT_UNKNOWN_RESULT, 1021).
 
 
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 17c899d..3349722 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -872,6 +872,11 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
                 _ -> fwd
             end,
 
+            RestartTx = case fabric2_util:get_value(restart_tx, Options) of
+                undefined -> [{restart_tx, true}];
+                _AlreadySet -> []
+            end,
+
             StartKey = get_since_seq(TxDb, Dir, SinceSeq),
             EndKey = case Dir of
                 rev -> fabric2_util:seq_zero_vs();
@@ -880,7 +885,7 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
             FoldOpts = [
                 {start_key, StartKey},
                 {end_key, EndKey}
-            ] ++ Options,
+            ] ++ RestartTx ++ Options,
 
             {ok, fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
                 {SeqVS} = erlfdb_tuple:unpack(K, Prefix),
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 99611b0..00bb485 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -72,6 +72,23 @@
 -include("fabric2.hrl").
 
 
+-define(MAX_FOLD_RANGE_RETRIES, 3).
+
+
+-record(fold_acc, {
+    db,
+    restart_tx,
+    start_key,
+    end_key,
+    limit,
+    skip,
+    retries,
+    base_opts,
+    user_fun,
+    user_acc
+}).
+
+
 transactional(Fun) ->
     do_transaction(Fun, undefined).
 
@@ -835,25 +852,49 @@ get_last_change(#{} = Db) ->
     end.
 
 
-fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
-    #{
-        tx := Tx
-    } = ensure_current(Db),
-    fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
-
-fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
-    case fabric2_util:get_value(limit, Options) of
-        0 ->
-            % FoundationDB treats a limit of 0 as unlimited
-            % so we have to guard for that here.
-            UserAcc;
-        _ ->
-            {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
-            Callback = fun fold_range_cb/2,
-            Acc = {skip, Skip, UserCallback, UserAcc},
-            {skip, _, UserCallback, OutAcc} =
-                    erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
-            OutAcc
+fold_range(TxOrDb, RangePrefix, UserFun, UserAcc, Options) ->
+    {Db, Tx} = case TxOrDb of
+        {tx, TxObj} ->
+            {undefined, TxObj};
+        #{} = DbObj ->
+            DbObj1 = #{tx := TxObj} = ensure_current(DbObj),
+            {DbObj1, TxObj}
+    end,
+    % FoundationDB treats a limit 0 of as unlimited so we guard against it
+    case fabric2_util:get_value(limit, Options) of 0 -> UserAcc; _ ->
+        FAcc = get_fold_acc(Db, RangePrefix, UserFun, UserAcc, Options),
+        try
+            fold_range(Tx, FAcc)
+        after
+            erase(?PDICT_FOLD_ACC_STATE)
+        end
+    end.
+
+
+fold_range(Tx, FAcc) ->
+    #fold_acc{
+        start_key = Start,
+        end_key = End,
+        limit = Limit,
+        base_opts = BaseOpts,
+        restart_tx = DoRestart
+    } = FAcc,
+    case DoRestart of false -> ok; true ->
+        ok = erlfdb:set_option(Tx, disallow_writes)
+    end,
+    Opts = [{limit, Limit} | BaseOpts],
+    Callback = fun fold_range_cb/2,
+    try
+        #fold_acc{
+            user_acc = FinalUserAcc
+        } = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts),
+        FinalUserAcc
+    catch error:{erlfdb_error, ?TRANSACTION_TOO_OLD} when DoRestart ->
+        % Possibly handle cluster_version_changed and future_version as well to
+        % continue iteration instead fallback to transactional and retrying
+        % from the beginning which is bound to fail when streaming data out to a
+        % socket.
+        fold_range(Tx, restart_fold(Tx, FAcc))
     end.
 
 
@@ -1297,7 +1338,9 @@ chunkify_binary(Data) ->
     end.
 
 
-get_fold_opts(RangePrefix, Options) ->
+get_fold_acc(Db, RangePrefix, UserCallback, UserAcc, Options)
+        when is_map(Db) orelse Db =:= undefined ->
+
     Reverse = case fabric2_util:get_value(dir, Options) of
         rev -> true;
         _ -> false
@@ -1362,8 +1405,8 @@ get_fold_opts(RangePrefix, Options) ->
     end,
 
     Limit = case fabric2_util:get_value(limit, Options) of
-        L when is_integer(L), L >= 0 -> [{limit, L + Skip}];
-        undefined -> []
+        L when is_integer(L), L >= 0 -> L + Skip;
+        undefined -> 0
     end,
 
     TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
@@ -1381,21 +1424,68 @@ get_fold_opts(RangePrefix, Options) ->
         B when is_boolean(B) -> [{snapshot, B}]
     end,
 
-    OutOpts = [{reverse, Reverse}]
-            ++ Limit
+    BaseOpts = [{reverse, Reverse}]
             ++ TargetBytes
             ++ StreamingMode
             ++ Snapshot,
 
-    {StartKey3, EndKey3, Skip, OutOpts}.
+    RestartTx = fabric2_util:get_value(restart_tx, Options, false),
+
+    #fold_acc{
+        db = Db,
+        start_key = StartKey3,
+        end_key = EndKey3,
+        skip = Skip,
+        limit = Limit,
+        retries = 0,
+        base_opts = BaseOpts,
+        restart_tx = RestartTx,
+        user_fun = UserCallback,
+        user_acc = UserAcc
+    }.
+
 
+fold_range_cb({K, V}, #fold_acc{} = Acc) ->
+    #fold_acc{
+        skip = Skip,
+        limit = Limit,
+        user_fun = UserFun,
+        user_acc = UserAcc,
+        base_opts = Opts
+    } = Acc,
+    Acc1 = case Skip =:= 0 of
+        true ->
+            UserAcc1 = UserFun({K, V}, UserAcc),
+            Acc#fold_acc{limit = max(0, Limit - 1), user_acc = UserAcc1};
+        false ->
+            Acc#fold_acc{skip = Skip - 1, limit = Limit - 1}
+    end,
+    Acc2 = case fabric2_util:get_value(reverse, Opts, false) of
+        true -> Acc1#fold_acc{end_key = erlfdb_key:last_less_or_equal(K)};
+        false -> Acc1#fold_acc{start_key = erlfdb_key:first_greater_than(K)}
+    end,
+    put(?PDICT_FOLD_ACC_STATE, Acc2),
+    Acc2.
 
-fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
-    NewAcc = Callback(KV, Acc),
-    {skip, 0, Callback, NewAcc};
 
-fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
-    {skip, N - 1, Callback, Acc}.
+restart_fold(Tx, #fold_acc{} = Acc) ->
+    erase(?PDICT_CHECKED_MD_IS_CURRENT),
+    % Not actually committing anyting so we skip on-commit handlers here. Those
+    % are usually to refresh db handles in the cache. If the iterator runs for
+    % a while it might be inserting a stale handle in there anyway.
+    erase({?PDICT_ON_COMMIT_FUN, Tx}),
+
+    ok = erlfdb:reset(Tx),
+
+    case {erase(?PDICT_FOLD_ACC_STATE), Acc#fold_acc.retries} of
+        {#fold_acc{db = Db} = Acc1, _} ->
+            Acc1#fold_acc{db = check_db_instance(Db), retries = 0};
+        {undefined, Retries} when Retries < ?MAX_FOLD_RANGE_RETRIES ->
+            Db = check_db_instance(Acc#fold_acc.db),
+            Acc#fold_acc{db = Db, retries = Retries + 1};
+        {undefined, _} ->
+            error(fold_range_not_progressing)
+    end.
 
 
 get_db_handle() ->
@@ -1435,6 +1525,30 @@ ensure_current(#{} = Db0, CheckDbVersion) ->
     end.
 
 
+check_db_instance(undefined) ->
+    undefined;
+
+check_db_instance(#{} = Db) ->
+    require_transaction(Db),
+    case check_metadata_version(Db) of
+        {current, Db1} ->
+            Db1;
+        {stale, Db1} ->
+            #{
+                tx := Tx,
+                uuid := UUID,
+                name := DbName,
+                layer_prefix := LayerPrefix
+            } = Db1,
+            DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
+            UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
+            case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of
+                UUID -> Db1;
+                _ -> error(database_does_not_exist)
+            end
+    end.
+
+
 is_transaction_applied(Tx) ->
     is_commit_unknown_result()
         andalso has_transaction_id()
diff --git a/src/fabric/test/fabric2_changes_fold_tests.erl b/src/fabric/test/fabric2_changes_fold_tests.erl
index 8a29bcb..fddf180 100644
--- a/src/fabric/test/fabric2_changes_fold_tests.erl
+++ b/src/fabric/test/fabric2_changes_fold_tests.erl
@@ -21,28 +21,55 @@
 
 -define(DOC_COUNT, 25).
 
+-define(PDICT_ERROR_IN_FOLD_RANGE, '$fabric2_error_in_fold_range').
+-define(PDICT_ERROR_IN_USER_FUN, '$fabric2_error_throw_in_user_fun').
+
 
 changes_fold_test_() ->
     {
         "Test changes fold operations",
         {
             setup,
-            fun setup/0,
-            fun cleanup/1,
-            with([
-                ?TDEF(fold_changes_basic),
-                ?TDEF(fold_changes_since_now),
-                ?TDEF(fold_changes_since_seq),
-                ?TDEF(fold_changes_basic_rev),
-                ?TDEF(fold_changes_since_now_rev),
-                ?TDEF(fold_changes_since_seq_rev)
-            ])
+            fun setup_all/0,
+            fun teardown_all/1,
+            {
+                foreach,
+                fun setup/0,
+                fun cleanup/1,
+                [
+                    ?TDEF_FE(fold_changes_basic),
+                    ?TDEF_FE(fold_changes_since_now),
+                    ?TDEF_FE(fold_changes_since_seq),
+                    ?TDEF_FE(fold_changes_basic_rev),
+                    ?TDEF_FE(fold_changes_since_now_rev),
+                    ?TDEF_FE(fold_changes_since_seq_rev),
+                    ?TDEF_FE(fold_changes_basic_tx_too_long),
+                    ?TDEF_FE(fold_changes_reverse_tx_too_long),
+                    ?TDEF_FE(fold_changes_tx_too_long_with_single_row_emits),
+                    ?TDEF_FE(fold_changes_since_seq_tx_too_long),
+                    ?TDEF_FE(fold_changes_not_progressing)
+                ]
+            }
         }
     }.
 
 
-setup() ->
+setup_all() ->
     Ctx = test_util:start_couch([fabric]),
+    meck:new(erlfdb, [passthrough]),
+    Ctx.
+
+
+teardown_all(Ctx) ->
+    meck:unload(),
+    test_util:stop_couch(Ctx).
+
+
+setup() ->
+    meck:expect(erlfdb, fold_range, fun(Tx, Start, End, Callback, Acc, Opts) ->
+        maybe_tx_too_long(?PDICT_ERROR_IN_FOLD_RANGE),
+        meck:passthrough([Tx, Start, End, Callback, Acc, Opts])
+    end),
     {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]),
     Rows = lists:map(fun(Val) ->
         DocId = fabric2_util:uuid(),
@@ -59,57 +86,193 @@ setup() ->
             rev_id => RevId
         }
     end, lists:seq(1, ?DOC_COUNT)),
-    {Db, Rows, Ctx}.
+    {Db, Rows}.
 
 
-cleanup({Db, _DocIdRevs, Ctx}) ->
-    ok = fabric2_db:delete(fabric2_db:name(Db), []),
-    test_util:stop_couch(Ctx).
+cleanup({Db, _DocIdRevs}) ->
+    reset_error_counts(),
+    ok = fabric2_db:delete(fabric2_db:name(Db), []).
 
 
-fold_changes_basic({Db, DocRows, _}) ->
-    {ok, Rows} = fabric2_db:fold_changes(Db, 0, fun fold_fun/2, []),
-    ?assertEqual(lists:reverse(DocRows), Rows).
+fold_changes_basic({Db, DocRows}) ->
+    ?assertEqual(lists:reverse(DocRows), changes(Db)).
 
 
-fold_changes_since_now({Db, _, _}) ->
-    {ok, Rows} = fabric2_db:fold_changes(Db, now, fun fold_fun/2, []),
-    ?assertEqual([], Rows).
+fold_changes_since_now({Db, _}) ->
+    ?assertEqual([], changes(Db, now, [])).
 
 
-fold_changes_since_seq({_, [], _}) ->
+fold_changes_since_seq({_, []}) ->
     ok;
 
-fold_changes_since_seq({Db, [Row | RestRows], _}) ->
+fold_changes_since_seq({Db, [Row | RestRows]}) ->
     #{sequence := Since} = Row,
-    {ok, Rows} = fabric2_db:fold_changes(Db, Since, fun fold_fun/2, []),
-    ?assertEqual(lists:reverse(RestRows), Rows),
-    fold_changes_since_seq({Db, RestRows, nil}).
+    ?assertEqual(lists:reverse(RestRows), changes(Db, Since, [])),
+    fold_changes_since_seq({Db, RestRows}).
 
 
-fold_changes_basic_rev({Db, _, _}) ->
-    Opts = [{dir, rev}],
-    {ok, Rows} = fabric2_db:fold_changes(Db, 0, fun fold_fun/2, [], Opts),
-    ?assertEqual([], Rows).
+fold_changes_basic_rev({Db, _}) ->
+    ?assertEqual([], changes(Db, 0, [{dir, rev}])).
 
 
-fold_changes_since_now_rev({Db, DocRows, _}) ->
-    Opts = [{dir, rev}],
-    {ok, Rows} = fabric2_db:fold_changes(Db, now, fun fold_fun/2, [], Opts),
-    ?assertEqual(DocRows, Rows).
+fold_changes_since_now_rev({Db, DocRows}) ->
+    ?assertEqual(DocRows, changes(Db, now, [{dir, rev}])).
 
 
-fold_changes_since_seq_rev({_, [], _}) ->
+fold_changes_since_seq_rev({_, []}) ->
     ok;
 
-fold_changes_since_seq_rev({Db, DocRows, _}) ->
+fold_changes_since_seq_rev({Db, DocRows}) ->
     #{sequence := Since} = lists:last(DocRows),
     Opts = [{dir, rev}],
-    {ok, Rows} = fabric2_db:fold_changes(Db, Since, fun fold_fun/2, [], Opts),
-    ?assertEqual(DocRows, Rows),
+    ?assertEqual(DocRows, changes(Db, Since, Opts)),
     RestRows = lists:sublist(DocRows, length(DocRows) - 1),
-    fold_changes_since_seq_rev({Db, RestRows, nil}).
+    fold_changes_since_seq_rev({Db, RestRows}).
+
+
+fold_changes_basic_tx_too_long({Db, DocRows0}) ->
+    DocRows = lists:reverse(DocRows0),
+
+    tx_too_long_errors(0, 1),
+    ?assertEqual(DocRows, changes(Db)),
+
+    tx_too_long_errors(1, 0),
+    ?assertEqual(DocRows, changes(Db)),
+
+    % Blow up in user fun but after emitting one row successfully.
+    tx_too_long_errors({1, 1}, 0),
+    ?assertEqual(DocRows, changes(Db)),
+
+    % Blow up before last document
+    tx_too_long_errors({?DOC_COUNT - 1, 1}, 0),
+    ?assertEqual(DocRows, changes(Db)),
+
+    % Emit one value, then blow up in user function and then blow up twice in
+    % fold_range. But it is not enough to stop the iteration.
+    tx_too_long_errors({1, 1}, {1, 2}),
+    ?assertEqual(DocRows, changes(Db)).
+
+
+fold_changes_reverse_tx_too_long({Db, DocRows}) ->
+    Opts = [{dir, rev}],
+
+    tx_too_long_errors(0, 1),
+    ?assertEqual([], changes(Db, 0, Opts)),
+
+    tx_too_long_errors(1, 0),
+    ?assertEqual([], changes(Db, 0, Opts)),
+
+    tx_too_long_errors(1, 0),
+    ?assertEqual(DocRows, changes(Db, now, Opts)),
+
+    tx_too_long_errors(1, 0),
+    ?assertEqual(DocRows, changes(Db, now, Opts)),
+
+    % Blow up in user fun but after emitting one row successfully.
+    tx_too_long_errors({1, 1}, 0),
+    ?assertEqual(DocRows, changes(Db, now, Opts)),
+
+    % Blow up before last document
+    tx_too_long_errors({?DOC_COUNT - 1, 1}, 0),
+    ?assertEqual(DocRows, changes(Db, now, Opts)),
+
+    % Emit value, blow up in user function, and twice in fold_range
+    tx_too_long_errors({1, 1}, {1, 2}),
+    ?assertEqual(DocRows, changes(Db, now, Opts)).
+
+
+fold_changes_tx_too_long_with_single_row_emits({Db, DocRows0}) ->
+    % This test does a few basic operations while forcing erlfdb range fold to
+    % emit a single row at a time, thus forcing it to use continuations while
+    % also inducing tx errors
+    Opts = [{target_bytes, 1}],
+    DocRows = lists:reverse(DocRows0),
+
+    tx_too_long_errors(0, 1),
+    ?assertEqual(DocRows, changes(Db, 0, Opts)),
+
+    tx_too_long_errors(1, 0),
+    ?assertEqual(DocRows, changes(Db, 0, Opts)),
+
+    % Blow up in user fun but after emitting one row successfully.
+    tx_too_long_errors({1, 1}, 0),
+    ?assertEqual(DocRows, changes(Db, 0, Opts)),
+
+    % Blow up before last document
+    tx_too_long_errors({?DOC_COUNT - 1, 1}, 0),
+    ?assertEqual(DocRows, changes(Db, 0, Opts)).
+
+
+fold_changes_since_seq_tx_too_long({Db, Rows}) ->
+    % Blow up after after a successful emit, then twice
+    % in range fold call. Also re-use already existing basic
+    % fold_changes_since_seq test function.
+    tx_too_long_errors({1, 1}, {1, 2}),
+    fold_changes_since_seq({Db, Rows}).
+
+
+fold_changes_not_progressing({Db, _}) ->
+    % Fail in first fold range call.
+    tx_too_long_errors(5, 0),
+    ?assertError(fold_range_not_progressing, changes(Db)),
+
+    % Fail in first user fun call.
+    tx_too_long_errors(0, 5),
+    ?assertError(fold_range_not_progressing, changes(Db)),
+
+    % Blow up in last user fun call
+    tx_too_long_errors({?DOC_COUNT - 1, 5}, 0),
+    ?assertError(fold_range_not_progressing, changes(Db)),
+
+    % Blow up in user function after one success.
+    tx_too_long_errors({1, 5}, 0),
+    ?assertError(fold_range_not_progressing, changes(Db)),
+
+    % Emit value, blow up in user function, then keep blowing up in fold_range.
+    tx_too_long_errors({1, 1}, {1, 4}),
+    ?assertError(fold_range_not_progressing, changes(Db)).
 
 
 fold_fun(#{} = Change, Acc) ->
+    maybe_tx_too_long(?PDICT_ERROR_IN_USER_FUN),
     {ok, [Change | Acc]}.
+
+
+tx_too_long_errors(UserFunCount, FoldErrors) when is_integer(UserFunCount) ->
+    tx_too_long_errors({0, UserFunCount}, FoldErrors);
+
+tx_too_long_errors(UserFunErrors, FoldCount) when is_integer(FoldCount) ->
+    tx_too_long_errors(UserFunErrors, {0, FoldCount});
+
+tx_too_long_errors({UserFunSkip, UserFunCount}, {FoldSkip, FoldCount}) ->
+    reset_error_counts(),
+    put(?PDICT_ERROR_IN_USER_FUN, {UserFunSkip, UserFunCount}),
+    put(?PDICT_ERROR_IN_FOLD_RANGE, {FoldSkip, FoldCount}).
+
+
+reset_error_counts() ->
+    erase(?PDICT_ERROR_IN_FOLD_RANGE),
+    erase(?PDICT_ERROR_IN_USER_FUN).
+
+
+changes(Db) ->
+    changes(Db, 0, []).
+
+
+changes(Db, Since, Opts) ->
+    {ok, Rows} = fabric2_db:fold_changes(Db, Since, fun fold_fun/2, [], Opts),
+    Rows.
+
+
+maybe_tx_too_long(Key) ->
+    case get(Key) of
+        {Skip, Count} when is_integer(Skip), Skip > 0 ->
+            put(Key, {Skip - 1, Count});
+        {0, Count} when is_integer(Count), Count > 0 ->
+            put(Key, {0, Count - 1}),
+            error({erlfdb_error, 1007});
+        {0, 0} ->
+            ok;
+        undefined ->
+            ok
+    end.
diff --git a/src/fabric/test/fabric2_test.hrl b/src/fabric/test/fabric2_test.hrl
index a0532b3..9239096 100644
--- a/src/fabric/test/fabric2_test.hrl
+++ b/src/fabric/test/fabric2_test.hrl
@@ -10,9 +10,17 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
+
+% Some test modules do not use with, so squash the unused fun compiler warning
+-compile([{nowarn_unused_function, [{with, 1}]}]).
+
+
 -define(TDEF(Name), {atom_to_list(Name), fun Name/1}).
 -define(TDEF(Name, Timeout), {atom_to_list(Name), Timeout, fun Name/1}).
 
+-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end).
+-define(TDEF_FE(Name, Timeout), fun(Arg) -> {atom_to_list(Name), {timeout, Timeout, ?_test(Name(Arg))}} end).
+
 
 with(Tests) ->
     fun(ArgsTuple) ->