You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2021/08/26 21:56:09 UTC

[couchdb] branch 3.x updated: Avoid change feed rewinds after shard moves

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/3.x by this push:
     new e83935c  Avoid change feed rewinds after shard moves
e83935c is described below

commit e83935c7f8c3e47b47f07f22ece327f6529d4da0
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Tue Aug 24 18:44:21 2021 -0400

    Avoid change feed rewinds after shard moves
    
    When shards are moved to new nodes, and the user supplies a change sequence
    from the old shard map configuration, attempt to match missing nodes and ranges
    by inspecting current shard uuids in order to avoid rewinds.
    
    Previously, if a node and range was missing, we randomly picked a node in the
    appropriate range, so 1/3 of the time we might have hit the exact node, but 2/3
    of the time we would end up with a complete changes feed rewind to 0.
    
    Unfortunately, this involves a fabric worker scatter gather operation to all
    shard copies. This should only happen when we get an old sequence. We rely on
    that happening rarely, mostly right after the shards moved, then users would
    get new sequence from the recent shard map.
---
 src/fabric/src/fabric.erl                          |   7 +-
 src/fabric/src/fabric_db_uuids.erl                 |  67 +++++++++++
 src/fabric/src/fabric_ring.erl                     |  54 ++++++++-
 src/fabric/src/fabric_rpc.erl                      |  10 +-
 src/fabric/src/fabric_util.erl                     |  55 ++++++++++
 src/fabric/src/fabric_view_changes.erl             |  57 +++++++++-
 src/fabric/test/eunit/fabric_db_uuids_tests.erl    |  51 +++++++++
 .../test/eunit/fabric_moved_shards_seq_tests.erl   | 122 +++++++++++++++++++++
 8 files changed, 414 insertions(+), 9 deletions(-)

diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 34b967d..6386034 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -37,7 +37,7 @@
 % miscellany
 -export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0,
     cleanup_index_files/1, cleanup_index_files_all_nodes/1, dbname/1,
-    inactive_index_files/1]).
+    inactive_index_files/1, db_uuids/1]).
 
 -include_lib("fabric/include/fabric.hrl").
 
@@ -551,6 +551,11 @@ dbname(Db) ->
         erlang:error({illegal_database_name, Db})
     end.
 
+%% @doc get db shard uuids
+-spec db_uuids(dbname())  -> map().
+db_uuids(DbName) ->
+    fabric_db_uuids:go(dbname(DbName)).
+
 name(Thing) ->
     couch_util:to_binary(Thing).
 
diff --git a/src/fabric/src/fabric_db_uuids.erl b/src/fabric/src/fabric_db_uuids.erl
new file mode 100644
index 0000000..a440d74
--- /dev/null
+++ b/src/fabric/src/fabric_db_uuids.erl
@@ -0,0 +1,67 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_uuids).
+
+
+-export([go/1]).
+
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+go(DbName) when is_binary(DbName) ->
+    Shards = mem3:live_shards(DbName, [node() | nodes()]),
+    Workers = fabric_util:submit_jobs(Shards, get_uuid, []),
+    RexiMon = fabric_util:create_monitors(Shards),
+    Acc0 = {fabric_dict:init(Workers, nil), []},
+    try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+        {timeout, {WorkersDict, _}} ->
+            DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+            fabric_util:log_timeout(DefunctWorkers, "db_uuids"),
+            {error, timeout};
+        Else ->
+            Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+
+handle_message({rexi_DOWN, _, {_, NodeRef},_}, _Shard, {Cntrs, Res}) ->
+    case fabric_ring:node_down(NodeRef, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, {nodedown, <<"progress not possible">>}}
+    end;
+
+handle_message({rexi_EXIT, Reason}, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, Reason}
+    end;
+
+handle_message(Uuid, Shard, {Cntrs, Res}) when is_binary(Uuid) ->
+    case fabric_ring:handle_response(Shard, Uuid, Cntrs, Res, [all]) of
+        {ok, {Cntrs1, Res1}} ->
+            {ok, {Cntrs1, Res1}};
+        {stop, Res1} ->
+            Uuids = fabric_dict:fold(fun(#shard{} = S, Id, #{} = Acc) ->
+                Acc#{Id => S#shard{ref = undefined}}
+            end, #{}, Res1),
+            {stop, Uuids}
+    end;
+
+handle_message(Reason, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, Reason}
+    end.
diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl
index 110edb9..bad0f42 100644
--- a/src/fabric/src/fabric_ring.erl
+++ b/src/fabric/src/fabric_ring.erl
@@ -122,6 +122,9 @@ handle_response(Shard, Response, Workers, Responses, RingOpts) ->
 %    a partitioned database. As soon as a result from any of the shards
 %    arrives, result collection stops.
 %
+%  * When RingOpts is [all], responses are accepted until all the shards return
+%  results
+%
 handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
     Workers1 = fabric_dict:erase(Shard, Workers),
     case RingOpts of
@@ -130,7 +133,10 @@ handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
             Responses1 = [{{B, E}, Shard, Response} | Responses],
             handle_response_ring(Workers1, Responses1, CleanupCb);
         [{any, Any}] ->
-            handle_response_any(Shard, Response, Workers1, Any, CleanupCb)
+            handle_response_any(Shard, Response, Workers1, Any, CleanupCb);
+        [all] ->
+            Responses1 = [{Shard, Response} | Responses],
+            handle_response_all(Workers1, Responses1)
     end.
 
 
@@ -164,6 +170,15 @@ handle_response_any(Shard, Response, Workers, Any, CleanupCb) ->
     end.
 
 
+handle_response_all(Workers, Responses) ->
+    case fabric_dict:size(Workers) =:= 0 of
+        true ->
+            {stop, fabric_dict:from_list(Responses)};
+        false ->
+            {ok, {Workers, Responses}}
+    end.
+
+
 % Check if workers still waiting and the already received responses could
 % still form a continous range. The range won't always be the full ring, and
 % the bounds are computed based on the minimum and maximum interval beginning
@@ -186,6 +201,9 @@ is_progress_possible(Counters, Responses, MinB, MaxE, []) ->
     Ranges = fabric_util:worker_ranges(Counters) ++ ResponseRanges,
     mem3_util:get_ring(Ranges, MinB, MaxE) =/= [];
 
+is_progress_possible(Counters, _Responses, _, _, [all]) ->
+    fabric_dict:size(Counters) > 0;
+
 is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) ->
     InAny = fun(S) -> lists:member(S#shard{ref = undefined}, AnyShards) end,
     case fabric_dict:filter(fun(S, _) -> InAny(S) end, Counters) of
@@ -305,7 +323,7 @@ is_progress_possible_with_responses_test() ->
     ?assertEqual(true, is_progress_possible([], RS1, 7, 8, [])).
 
 
-is_progress_possible_with_ring_opts_test() ->
+is_progress_possible_with_ring_opts_any_test() ->
     Opts = [{any, [mk_shard("n1", [0, 5]), mk_shard("n2", [3, 10])]}],
     C1 = [{mk_shard("n1", [0, ?RING_END]), nil}],
     RS1 = mk_resps([{"n1", 3, 10, 42}]),
@@ -323,6 +341,12 @@ is_progress_possible_with_ring_opts_test() ->
     ?assertEqual(true, is_progress_possible(C2, [], 0, ?RING_END, Opts)).
 
 
+is_progress_possible_with_ring_opts_all_test() ->
+    C1 = [{mk_shard("n1", [0, ?RING_END]), nil}],
+    ?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [all])),
+    ?assertEqual(false, is_progress_possible([], [], 0, ?RING_END, [all])).
+
+
 get_shard_replacements_test() ->
     Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [
         {"n1", 11, 20}, {"n1", 21, ?RING_END},
@@ -422,7 +446,7 @@ handle_response_backtracking_test() ->
     ?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4).
 
 
-handle_response_ring_opts_test() ->
+handle_response_ring_opts_any_test() ->
     Shard1 = mk_shard("n1", [0, 5]),
     Shard2 = mk_shard("n2", [0, 1]),
     Shard3 = mk_shard("n3", [0, 1]),
@@ -446,6 +470,30 @@ handle_response_ring_opts_test() ->
     ?assertEqual({stop, [{Shard3, 44}]}, Result3).
 
 
+handle_response_ring_opts_all_test() ->
+    Shard1 = mk_shard("n1", [0, 5]),
+    Shard2 = mk_shard("n2", [0, 1]),
+    Shard3 = mk_shard("n3", [0, 1]),
+
+    ShardList = [Shard1, Shard2, Shard3],
+    [W1, W2, W3] = WithRefs = [S#shard{ref = make_ref()} || S <- ShardList],
+    Workers1 = fabric_dict:init(WithRefs, nil),
+
+    Result1 = handle_response(W1, 42, Workers1, [], [all], undefined),
+    ?assertMatch({ok, {_, _}}, Result1),
+    {ok, {Workers2, _}} = Result1,
+
+    % Even though n2 and n3 cover the same range, with 'all' option we wait for
+    % all workers to return.
+    Result2 = handle_response(W2, 43, Workers2, [], [all], undefined),
+    ?assertMatch({ok, {_, _}}, Result2),
+    {ok, {Workers3, _}} = Result2,
+
+    % Stop only after all the shards respond
+    Result3 = handle_response(W3, 44, Workers3, [], [all], undefined),
+    ?assertMatch({stop, [_ | _]}, Result3).
+
+
 handle_error_test() ->
     Shard1 = mk_shard("n1", [0, 5]),
     Shard2 = mk_shard("n1", [10, ?RING_END]),
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 9ed8efd..4330f92 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -26,7 +26,7 @@
 
 -export([get_db_info/2, get_doc_count/2, get_design_doc_count/2,
          get_update_seq/2, changes/4, map_view/5, reduce_view/5,
-         group_info/3, update_mrview/4]).
+         group_info/3, update_mrview/4, get_uuid/1]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("couch/include/couch_db.hrl").
@@ -318,6 +318,9 @@ compact(ShardName, DesignName) ->
     Ref = erlang:make_ref(),
     Pid ! {'$gen_call', {self(), Ref}, compact}.
 
+get_uuid(DbName) ->
+    with_db(DbName, [], {couch_db, get_uuid, []}).
+
 %%
 %% internal
 %%
@@ -637,10 +640,9 @@ calculate_start_seq(Db, Node, Seq) ->
 
 uuid(Db) ->
     Uuid = couch_db:get_uuid(Db),
-    binary:part(Uuid, {0, uuid_prefix_len()}).
+    Prefix = fabric_util:get_uuid_prefix_len(),
+    binary:part(Uuid, {0, Prefix}).
 
-uuid_prefix_len() ->
-    list_to_integer(config:get("fabric", "uuid_prefix_len", "7")).
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 84ffef1..9dd8e71 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -23,6 +23,9 @@
 -export([validate_all_docs_args/2, validate_args/3]).
 -export([upgrade_mrargs/1]).
 -export([worker_ranges/1]).
+-export([get_uuid_prefix_len/0]).
+-export([isolate/1, isolate/2]).
+
 
 -compile({inline, [{doc_id_and_rev,1}]}).
 
@@ -345,3 +348,55 @@ worker_ranges(Workers) ->
         [{X, Y} | Acc]
     end, [], Workers),
     lists:usort(Ranges).
+
+
+get_uuid_prefix_len() ->
+    config:get_integer("fabric", "uuid_prefix_len", 7).
+
+
+% If we issue multiple fabric calls from the same process we have to isolate
+% them so in case of error they don't pollute the processes dictionary or the
+% mailbox
+
+isolate(Fun) ->
+    isolate(Fun, infinity).
+
+
+isolate(Fun, Timeout) ->
+    {Pid, Ref} = erlang:spawn_monitor(fun() -> exit(do_isolate(Fun)) end),
+    receive
+        {'DOWN', Ref, _, _, {'$isolres', Res}} ->
+            Res;
+        {'DOWN', Ref, _, _, {'$isolerr', Tag, Reason, Stack}} ->
+            erlang:raise(Tag, Reason, Stack)
+    after Timeout ->
+        erlang:demonitor(Ref, [flush]),
+        exit(Pid, kill),
+        erlang:error(timeout)
+    end.
+
+
+% OTP_RELEASE is defined in OTP 21+ only
+-ifdef(OTP_RELEASE).
+
+
+do_isolate(Fun) ->
+    try
+        {'$isolres', Fun()}
+    catch Tag:Reason:Stack ->
+        {'$isolerr', Tag, Reason, Stack}
+    end.
+
+
+-else.
+
+
+do_isolate(Fun) ->
+    try
+        {'$isolres', Fun()}
+    catch ?STACKTRACE(Tag, Reason, Stack)
+        {'$isolerr', Tag, Reason, Stack}
+    end.
+
+
+-endif.
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl
index febbd31..beeaece 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -453,12 +453,14 @@ do_unpack_seqs(Opaque, DbName) ->
         true ->
             Unpacked;
         false ->
+            Uuids = get_db_uuid_shards(DbName),
             PotentialWorkers = lists:map(fun({Node, [A, B], Seq}) ->
                 case mem3:get_shard(DbName, Node, [A, B]) of
                     {ok, Shard} ->
                         {Shard, Seq};
                     {error, not_found} ->
-                        {#shard{node = Node, range = [A, B]}, Seq}
+                        Shard = replace_moved_shard(Node, [A, B], Seq, Uuids),
+                        {Shard, Seq}
                 end
             end, Deduped),
             Shards = mem3:shards(DbName),
@@ -495,6 +497,59 @@ get_old_seq(#shard{range=R}=Shard, SinceSeqs) ->
     end.
 
 
+get_db_uuid_shards(DbName) ->
+    % Need to use an isolated process as we are performing a fabric call from
+    % another fabric call and there is a good chance we'd polute the mailbox
+    % with returned messages
+    Timeout = fabric_util:request_timeout(),
+    IsolatedFun = fun() -> fabric:db_uuids(DbName) end,
+    try fabric_util:isolate(IsolatedFun, Timeout) of
+        {ok, Uuids} ->
+            % Trim uuids so we match exactly based on the currently configured
+            % uuid_prefix_len. The assumption is that we are getting an older
+            % sequence from the same cluster and we didn't tweak that
+            % relatively obscure config option in the meantime.
+            PrefixLen = fabric_util:get_uuid_prefix_len(),
+            maps:fold(fun(Uuid, Shard, Acc) ->
+                TrimmedUuid = binary:part(Uuid, {0, PrefixLen}),
+                Acc#{TrimmedUuid => Shard}
+            end, #{}, Uuids);
+        {error, Error} ->
+            % Since we are doing a best-effort approach to match moved shards,
+            % tolerate and log errors. This should also handle cases when the
+            % cluster is partially upgraded, as some nodes will not have the
+            % newer get_uuid fabric_rpc handler.
+            ErrMsg = "~p : could not get db_uuids for Db:~p Error:~p",
+            couch_log:error(ErrMsg, [?MODULE, DbName, Error]),
+            #{}
+    catch
+        _Tag:Error ->
+            ErrMsg = "~p : could not get db_uuids for Db:~p Error:~p",
+            couch_log:error(ErrMsg, [?MODULE, DbName, Error]),
+            #{}
+    end.
+
+
+%% Determine if the missing shard moved to a new node. Do that by matching the
+%% uuids from the current shard map. If we cannot find a moved shard, we return
+%% the original node and range as a shard and hope for the best.
+replace_moved_shard(Node, Range, Seq, #{} = _UuidShards) when is_number(Seq) ->
+    % Cannot figure out shard moves without uuid matching
+    #shard{node = Node, range = Range};
+replace_moved_shard(Node, Range, {Seq, Uuid}, #{} = UuidShards) ->
+    % Compatibility case for an old seq format which didn't have epoch nodes
+    replace_moved_shard(Node, Range, {Seq, Uuid, Node}, UuidShards);
+replace_moved_shard(Node, Range, {_Seq, Uuid, _EpochNode}, #{} = UuidShards) ->
+    case UuidShards of
+        #{Uuid := #shard{range = Range} = Shard} ->
+            % Found a moved shard by matching both the uuid and the range
+            Shard;
+        #{} ->
+            % Did not find a moved shard, use the original node
+            #shard{node = Node, range = Range}
+    end.
+
+
 changes_row(Props0) ->
     Props1 = case couch_util:get_value(deleted, Props0) of
         true ->
diff --git a/src/fabric/test/eunit/fabric_db_uuids_tests.erl b/src/fabric/test/eunit/fabric_db_uuids_tests.erl
new file mode 100644
index 0000000..986945b
--- /dev/null
+++ b/src/fabric/test/eunit/fabric_db_uuids_tests.erl
@@ -0,0 +1,51 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_uuids_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-define(TDEF(A), {atom_to_list(A), fun A/0}).
+
+main_test_() ->
+    {
+        setup,
+        fun setup/0,
+        fun teardown/1,
+        [
+            ?TDEF(t_can_get_shard_uuids)
+        ]
+    }.
+
+setup() ->
+    test_util:start_couch([fabric]).
+
+teardown(Ctx) ->
+    meck:unload(),
+    test_util:stop_couch(Ctx).
+
+t_can_get_shard_uuids() ->
+    DbName = ?tempdb(),
+    ok = fabric:create_db(DbName, []),
+    Shards = mem3:shards(DbName),
+    {ok, Uuids} = fabric:db_uuids(DbName),
+    ?assertEqual(length(Shards), map_size(Uuids)),
+    UuidsFromShards = lists:foldl(fun(#shard{} = Shard, Acc) ->
+        Uuid = couch_util:with_db(Shard#shard.name, fun(Db) ->
+            couch_db:get_uuid(Db)
+        end),
+        Acc#{Uuid => Shard}
+    end, #{}, Shards),
+    ?assertEqual(UuidsFromShards, Uuids),
+    ok = fabric:delete_db(DbName, []).
diff --git a/src/fabric/test/eunit/fabric_moved_shards_seq_tests.erl b/src/fabric/test/eunit/fabric_moved_shards_seq_tests.erl
new file mode 100644
index 0000000..a78d17a
--- /dev/null
+++ b/src/fabric/test/eunit/fabric_moved_shards_seq_tests.erl
@@ -0,0 +1,122 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_moved_shards_seq_tests).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+-define(TDEF(A), {atom_to_list(A), fun A/0}).
+
+
+main_test_() ->
+    {
+        setup,
+        fun setup/0,
+        fun teardown/1,
+        [
+            ?TDEF(t_shard_moves_avoid_sequence_rewinds)
+        ]
+    }.
+
+
+setup() ->
+    test_util:start_couch([fabric]).
+
+
+teardown(Ctx) ->
+    meck:unload(),
+    test_util:stop_couch(Ctx).
+
+
+t_shard_moves_avoid_sequence_rewinds() ->
+    DocCnt = 30,
+    DbName = ?tempdb(),
+
+    ok = fabric:create_db(DbName, [{q,1}, {n,1}]),
+    lists:foreach(fun(I) ->
+        update_doc(DbName, #doc{id = erlang:integer_to_binary(I)})
+    end, lists:seq(1, DocCnt)),
+
+    {ok, _, Seq1, 0} = changes(DbName, #changes_args{limit = 1, since = "now"}),
+    [{_, Range, {Seq, Uuid, _}}] = seq_decode(Seq1),
+
+    % Transform Seq1 pretending it came from a fake source node, before the
+    % shard was moved to the current node.
+    SrcNode = 'srcnode@srchost',
+    Seq2 = seq_encode([{SrcNode, Range, {Seq, Uuid, SrcNode}}]),
+
+    % First, check when the shard file epoch is mismatched epoch and the
+    % sequence would rewind. This ensures the epoch and uuid check protection
+    % in couch_db works as intended.
+    Result1 = changes(DbName, #changes_args{limit = 1, since = Seq2}),
+    ?assertMatch({ok, _, _, _}, Result1),
+    {ok, _, _, PendingRewind} = Result1,
+    ?assertEqual(DocCnt - 1, PendingRewind),
+
+    % Mock epoch checking to pretend that shard actually used to live on
+    % SrcNode. In this case, we should not have rewinds.
+    mock_epochs([{node(), DocCnt}, {SrcNode, 1}]),
+    Result2 = changes(DbName, #changes_args{limit = 1, since = Seq2}),
+    ?assertMatch({ok, _, _, _}, Result2),
+    {ok, _, _, PendingNoRewind} = Result2,
+    ?assertEqual(0, PendingNoRewind),
+
+    ok = fabric:delete_db(DbName, []).
+
+
+changes_callback(start, Acc) ->
+    {ok, Acc};
+
+changes_callback({change, {Change}}, Acc) ->
+    CM = maps:from_list(Change),
+    {ok, [CM | Acc]};
+
+changes_callback({stop, EndSeq, Pending}, Acc) ->
+    {ok, Acc, EndSeq, Pending}.
+
+
+changes(DbName, #changes_args{} = Args) ->
+    fabric_util:isolate(fun() ->
+        fabric:changes(DbName, fun changes_callback/2, [], Args)
+    end).
+
+
+update_doc(DbName, #doc{} = Doc) ->
+    fabric_util:isolate(fun() ->
+        case fabric:update_doc(DbName, Doc, [?ADMIN_CTX]) of
+            {ok, Res} -> Res
+        end
+    end).
+
+
+seq_decode(Seq) ->
+    % This is copied from fabric_view_changes
+    Pattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$",
+    Options = [{capture, [opaque], binary}],
+    {match, Seq1} = re:run(Seq, Pattern, Options),
+    binary_to_term(couch_util:decodeBase64Url(Seq1)).
+
+
+seq_encode(Unpacked) ->
+    % Copied from fabric_view_changes
+    Opaque = couch_util:encodeBase64Url(term_to_binary(Unpacked, [compressed])),
+    ?l2b(["30", $-, Opaque]).
+
+
+mock_epochs(Epochs) ->
+    % Since we made up a node name we'll have to mock epoch checking
+    meck:new(couch_db_engine, [passthrough]),
+    meck:expect(couch_db_engine, get_epochs, fun(_) -> Epochs end).