[couchdb] branch shard-split-changes-feed-test created

eiri pushed a change to branch shard-split-changes-feed-test
      at 7cd1026  Add tests for shard's split changes feed

     new 7cd1026  Add tests for shard's split changes feed

[couchdb] 01/01: Add tests for shard's split changes feed

eiri pushed a commit to branch shard-split-changes-feed-test
commit 7cd1026d4de113e8c018f7190aca709b27b2e53d
Author: Eric Avdey <>
AuthorDate: Thu Feb 28 16:54:13 2019 -0400

    Add tests for shard's split changes feed
 src/mem3/test/mem3_reshard_changes_feed_test.erl | 447 +++++++++++++++++++++++
 1 file changed, 447 insertions(+)

+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-define(assertChanges(Expected, Received),
+    begin
+        ((fun() ->
+            ExpectedIDs = lists:sort([I || #{id := I} <- Expected]),
+            ReceivedIDs = lists:sort([I || #{id := I} <- Received]),
+            ?assertEqual(ExpectedIDs, ReceivedIDs)
+        end)())
+    end).
+setup() ->
+    Db1 = ?tempdb(),
+    create_db(Db1, [{q, 1}, {n, 1}]),
+    #{db1 => Db1}.
+teardown(#{} = Dbs) ->
+    mem3_reshard:reset_state(),
+    maps:map(fun(_, Db) -> delete_db(Db) end, Dbs).
+start_couch() ->
+    test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]).
+stop_couch(Ctx) ->
+    test_util:stop_couch(Ctx).
+mem3_reshard_changes_feed_test_() ->
+    {
+        "mem3 shard split changes feed tests",
+        {
+            setup,
+            fun start_couch/0, fun stop_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun normal_feed_should_work_after_split/1,
+                    fun continuous_feed_should_work_during_split/1
+                ]
+            }
+        }
+    }.
+normal_feed_should_work_after_split(#{db1 := Db}) ->
+    ?_test(begin
+        DocSpec = #{
+            docs => [1, 10],
+            delete => [5, 6]
+        },
+        add_test_docs(Db, DocSpec),
+        % gather pre-shard changes
+        BaseArgs = #changes_args{feed = "normal", dir = fwd, since = 0},
+        {ok, OldChanges, OldEndSeq} = get_changes_feed(Db, BaseArgs),
+        % Split the shard
+        split_and_wait(Db),
+        % verify changes list consistent for all the old seqs
+        lists:foldl(fun(#{seq := Seq} = C, ExpectedChanges) ->
+            Args = BaseArgs#changes_args{since = Seq},
+            {ok, Changes, _EndSeq} = get_changes_feed(Db, Args),
+            ?assertChanges(ExpectedChanges, Changes),
+            [C | ExpectedChanges]
+        end, [], OldChanges),
+        % confirm that old LastSeq respected
+        Args1 = BaseArgs#changes_args{since = OldEndSeq},
+        {ok, Changes1, EndSeq1} = get_changes_feed(Db, Args1),
+        ?assertChanges([], Changes1),
+        % confirm that new LastSeq also respected
+        Args2 = BaseArgs#changes_args{since = EndSeq1},
+        {ok, Changes2, EndSeq2} = get_changes_feed(Db, Args2),
+        ?assertChanges([], Changes2),
+        ?assertEqual(EndSeq2, EndSeq1),
+        % confirm we didn't lost any changes and have consistent last seq
+        {ok, Changes3, EndSeq3} = get_changes_feed(Db, BaseArgs),
+        ?assertChanges(OldChanges, Changes3),
+        %% FIXME. last_seq should be consistent
+        % ?assertEqual(EndSeq3, EndSeq2),
+        % add some docs
+        add_test_docs(Db, #{docs => [11, 15]}),
+        Args4 = BaseArgs#changes_args{since = EndSeq3},
+        {ok, Changes4, EndSeq4} = get_changes_feed(Db, Args4),
+        AddedChanges = [#{id => ID} || #doc{id = ID} <- docs([11, 15])],
+        ?assertChanges(AddedChanges, Changes4),
+        % confirm include_docs and deleted works
+        Args5 = BaseArgs#changes_args{include_docs = true},
+        {ok, Changes5, EndSeq5} = get_changes_feed(Db, Args5),
+        ?assertEqual(EndSeq4, EndSeq5),
+        [SampleChange] = [C || #{id := ID} = C <- Changes5, ID == <<"00005">>],
+        ?assertMatch(#{deleted := true}, SampleChange),
+        ?assertMatch(#{doc := {Body}} when is_list(Body), SampleChange),
+        % update and delete some pre and post split docs
+        AllDocs = [couch_doc:from_json_obj(Doc) || #{doc := Doc} <- Changes5],
+        UpdateDocs = lists:filtermap(fun
+            (#doc{id = <<"00002">>}) -> true;
+            (#doc{id = <<"00012">>}) -> true;
+            (#doc{id = <<"00004">>} = Doc) -> {true, Doc#doc{deleted = true}};
+            (#doc{id = <<"00014">>} = Doc) -> {true, Doc#doc{deleted = true}};
+            (_) -> false
+        end, AllDocs),
+        update_docs(Db, UpdateDocs),
+        Args6 = BaseArgs#changes_args{since = EndSeq5},
+        {ok, Changes6, EndSeq6} = get_changes_feed(Db, Args6),
+        UpdatedChanges = [#{id => ID} || #doc{id = ID} <- UpdateDocs],
+        ?assertChanges(UpdatedChanges, Changes6),
+        [#{seq := Seq6} | _] = Changes6,
+        ?assertEqual(EndSeq6, Seq6),
+        Args7 = Args6#changes_args{dir = rev, limit = 4},
+        {ok, Changes7, EndSeq7} = get_changes_feed(Db, Args7),
+        %% FIXME - I believe rev is just conceptually broken in Couch 2.x
+        % ?assertChanges(lists:reverse(Changes6), Changes7),
+        ?assertEqual(4, length(Changes7)),
+        [#{seq := Seq7} | _] = Changes7,
+        ?assertEqual(EndSeq7, Seq7)
+    end).
+continuous_feed_should_work_during_split(#{db1 := Db}) ->
+    ?_test(begin
+        {UpdaterPid, UpdaterRef} = spawn_monitor(fun() ->
+            Updater = fun U({State, I}) ->
+                receive
+                    {get_state, {Pid, Ref}} ->
+                        Pid ! {state, Ref, State},
+                        U({State, I});
+                    add ->
+                        DocSpec = #{docs => [I, I]},
+                        add_test_docs(Db, DocSpec),
+                        U({State, I + 1});
+                    split ->
+                        spawn_monitor(fun() -> split_and_wait(Db) end),
+                        U({"in_process", I});
+                    stop ->
+                        ok;
+                    {'DOWN', _, process, _, normal} ->
+                        U({"after", I})
+                end
+            end,
+            Updater({"before", 1})
+        end),
+        Callback = fun
+            (start, Acc) ->
+                {ok, Acc};
+            (waiting_for_updates, Acc) ->
+                Ref = make_ref(),
+                UpdaterPid ! {get_state, {self(), Ref}},
+                receive {state, Ref, State} -> ok end,
+                case {State, length(Acc)} of
+                    {"before", N} when N < 5 ->
+                        UpdaterPid ! add;
+                    {"before", _} ->
+                        UpdaterPid ! split;
+                    {"in_process", N} when N < 10 ->
+                        UpdaterPid ! add;
+                    {"in_process", _} ->
+                        wait;
+                    {"after", N} when N < 15 ->
+                        UpdaterPid ! add;
+                    {"after", _} ->
+                        UpdaterPid ! stop,
+                        %% nfc why simple return of {stop, Acc} doesn't work
+                        exit({with_proc_res, {ok, Acc}})
+                end,
+                {ok, Acc};
+            (timeout, Acc) ->
+                {ok, Acc};
+            ({change, {Change}}, Acc) ->
+                CM = maps:from_list(Change),
+                {ok, [CM | Acc]};
+            ({stop, EndSeq, _Pending}, _Acc) ->
+                UpdaterPid ! stop,
+                Error = {assertion_fail, [
+                    {module, ?MODULE},
+                    {line, ?LINE},
+                    {value, {last_seq, EndSeq}},
+                    {reason, "Changes feed stopped on shard split"}
+                ]},
+                exit({with_proc_res, {error, Error}})
+        end,
+        BaseArgs = #changes_args{
+            feed = "continuous",
+            heartbeat = 100,
+            timeout = 1000
+        },
+        Result = get_changes_feed(Db, BaseArgs, Callback),
+        receive
+            {'DOWN', UpdaterRef, process, UpdaterPid, normal} ->
+                ok;
+            {'DOWN', UpdaterRef, process, UpdaterPid, Error} ->
+                erlang:error({test_context_failed, [
+                    {module, ?MODULE},
+                    {line, ?LINE},
+                    {value, Error},
+                    {reason, "Updater died"}]})
+        end,
+        case Result of
+            {ok, Changes} ->
+                Expected = [#{id => ID} || #doc{id = ID} <- docs([1, 15])],
+                ?assertChanges(Expected, Changes);
+            {error, Err} ->
+                erlang:error(Err)
+        end
+    end).
+split_and_wait(Db) ->
+    [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)),
+    {ok, JobId} = mem3_reshard:start_split_job(Shard),
+    wait_state(JobId, completed),
+    ResultShards = lists:sort(mem3:local_shards(Db)),
+    ?assertEqual(2, length(ResultShards)).
+wait_state(JobId, State) ->
+    test_util:wait(fun() ->
+        case mem3_reshard:job(JobId) of
+            {ok, {Props}} ->
+                case couch_util:get_value(job_state, Props) of
+                    State -> ok;
+                    _ -> timer:sleep(100), wait
+                end;
+            {error, not_found} -> timer:sleep(100), wait
+        end
+    end, 30000).
+get_changes_feed(Db, Args) ->
+    get_changes_feed(Db, Args, fun changes_callback/2).
+get_changes_feed(Db, Args, Callback) ->
+    with_proc(fun() ->
+        fabric:changes(Db, Callback, [], Args)
+    end).
+changes_callback(start, Acc) ->
+    {ok, Acc};
+changes_callback({change, {Change}}, Acc) ->
+    CM = maps:from_list(Change),
+    {ok, [CM | Acc]};
+changes_callback({stop, EndSeq, _Pending}, Acc) ->
+    {ok, Acc, EndSeq}.
+%% common helpers from here
+create_db(DbName, Opts) ->
+    GL = erlang:group_leader(),
+    with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL).
+delete_db(DbName) ->
+    GL = erlang:group_leader(),
+    with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL).
+with_proc(Fun) ->
+    with_proc(Fun, undefined, 30000).
+with_proc(Fun, GroupLeader) ->
+    with_proc(Fun, GroupLeader, 30000).
+with_proc(Fun, GroupLeader, Timeout) ->
+    {Pid, Ref} = spawn_monitor(fun() ->
+        case GroupLeader of
+            undefined -> ok;
+            _ -> erlang:group_leader(GroupLeader, self())
+        end,
+        exit({with_proc_res, Fun()})
+    end),
+    receive
+        {'DOWN', Ref, process, Pid, {with_proc_res, Res}} ->
+            Res;
+        {'DOWN', Ref, process, Pid, Error} ->
+            error(Error)
+    after Timeout ->
+        erlang:demonitor(Ref, [flush]),
+        exit(Pid, kill),
+        error({with_proc_timeout, Fun, Timeout})
+   end.
+add_test_docs(DbName, #{} = DocSpec) ->
+    Docs = docs(maps:get(docs, DocSpec, []))
+        ++ ddocs(mrview, maps:get(mrview, DocSpec, []))
+        ++ ddocs(search, maps:get(search, DocSpec, []))
+        ++ ddocs(geo, maps:get(geo, DocSpec, []))
+        ++ ldocs(maps:get(local, DocSpec, [])),
+    Res = update_docs(DbName, Docs),
+    Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) ->
+        Doc#doc{revs = {RevPos, [Rev]}}
+    end, lists:zip(Docs, Res)),
+    case delete_docs(maps:get(delete, DocSpec, []), Docs1) of
+        [] -> ok;
+        [_ | _] = Deleted -> update_docs(DbName, Deleted)
+    end,
+    ok.
+update_docs(DbName, Docs) ->
+    with_proc(fun() ->
+        case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of
+            {accepted, Res} -> Res;
+            {ok, Res} -> Res
+        end
+    end).
+delete_docs([S, E], Docs) when E >= S ->
+    ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)],
+    lists:filtermap(fun(#doc{id = Id} = Doc) ->
+        case lists:member(Id, ToDelete) of
+            true -> {true, Doc#doc{deleted = true}};
+            false -> false
+        end
+    end, Docs);
+delete_docs(_, _) ->
+    [].
+docs([S, E]) when E >= S ->
+    [doc(<<"">>, I) || I <- lists:seq(S, E)];
+docs(_) ->
+    [].
+ddocs(Type, [S, E]) when E >= S ->
+    Body = ddprop(Type),
+    BType = atom_to_binary(Type, utf8),
+    [doc(<<"_design/", BType/binary>>, I, Body, 0) || I <- lists:seq(S, E)];
+ddocs(_, _) ->
+    [].
+ldocs([S, E]) when E >= S ->
+    [doc(<<"_local/">>, I, bodyprops(), 0) || I <- lists:seq(S, E)];
+ldocs(_) ->
+    [].
+doc(Pref, Id) ->
+    Body = bodyprops(),
+    doc(Pref, Id, Body, 42).
+doc(Pref, Id, BodyProps, AttSize) ->
+    #doc{
+        id = doc_id(Pref, Id),
+        body = {BodyProps},
+        atts = atts(AttSize)
+    }.
+doc_id(Pref, Id) ->
+    IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])),
+    <<Pref/binary, IdBin/binary>>.
+ddprop(mrview) ->
+    [
+        {<<"views">>, {[
+             {<<"v1">>, {[
+                 {<<"map">>, <<"function(d){emit(d);}">>}
+             ]}}
+        ]}}
+    ];
+ddprop(geo) ->
+    [
+        {<<"st_indexes">>, {[
+            {<<"area">>, {[
+                {<<"analyzer">>, <<"standard">>},
+                {<<"index">>, <<"function(d){if(d.g){st_index(d.g)}}">> }
+            ]}}
+        ]}}
+    ];
+ddprop(search) ->
+    [
+        {<<"indexes">>, {[
+            {<<"types">>, {[
+                {<<"index">>, <<"function(d){if(d.g){st_index(d.g.type)}}">>}
+            ]}}
+        ]}}
+   ].
+bodyprops() ->
+    [
+        {<<"g">>, {[
+            {<<"type">>, <<"Polygon">>},
+            {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]}
+        ]}}
+   ].
+atts(0) ->
+    [];
+atts(Size) when is_integer(Size), Size >= 1 ->
+    Data = << <<"x">> || _ <- lists:seq(1, Size) >>,
+    [couch_att:new([
+        {name, <<"att">>},
+        {type, <<"app/binary">>},
+        {att_len, Size},
+        {data, Data}
+    ])].