You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/02/22 13:39:16 UTC

[couchdb] branch shard-split updated: Add more shard splitting tests

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch shard-split
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/shard-split by this push:
     new f26e6f8  Add more shard splitting tests
f26e6f8 is described below

commit f26e6f86f0ba3bbf71d164b72ff014e6810b3a12
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Feb 22 08:35:34 2019 -0500

    Add more shard splitting tests
    
    Check shard splitting jobs preserve db data and metadata properly
    
    Also reduce logging level in a few places.
---
 src/mem3/src/mem3_reshard.erl       |   5 +-
 src/mem3/src/mem3_reshard_job.erl   |  17 +-
 src/mem3/src/mem3_reshard_store.erl |   2 +-
 src/mem3/test/mem3_reshard_test.erl | 487 ++++++++++++++++++++++++++++++++++++
 4 files changed, 501 insertions(+), 10 deletions(-)

diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl
index 7cd510c..b2ca436 100644
--- a/src/mem3/src/mem3_reshard.erl
+++ b/src/mem3/src/mem3_reshard.erl
@@ -130,7 +130,6 @@ job(JobId) ->
 
 -spec report(pid(), #job{}) -> ok.
 report(Server, #job{} = Job) when is_pid(Server) ->
-    couch_log:notice("~p reporting ~p ~p", [?MODULE, Server, jobfmt(Job)]),
     gen_server:cast(Server, {report, Job}).
 
 
@@ -650,7 +649,7 @@ info_delete(Key, StateInfo) ->
 
 -spec checkpoint_int(#job{}, #state{}) -> #state{}.
 checkpoint_int(#job{} = Job, State) ->
-    couch_log:notice("~p checkpoint ~s", [?MODULE, jobfmt(Job)]),
+    couch_log:debug("~p checkpoint ~s", [?MODULE, jobfmt(Job)]),
     case report_int(Job) of
         ok ->
             ok = mem3_reshard_store:store_job(State, Job),
@@ -668,7 +667,7 @@ report_int(Job) ->
         [#job{ref = Ref, pid = OldPid}] ->
             case Job#job.pid =:= OldPid of
                 true ->
-                    couch_log:notice("~p reported ~s", [?MODULE, jobfmt(Job)]),
+                    couch_log:debug("~p reported ~s", [?MODULE, jobfmt(Job)]),
                     % Carry over the reference from ets as the #job{} coming
                     % from the job process won't have it's own monitor ref.
                     true = ets:insert(?MODULE, Job#job{ref = Ref}),
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
index 40be94e..00eafdf 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -84,8 +84,17 @@ init([#job{} = Job0]) ->
     {ok, Job}.
 
 
+terminate(normal, Job) ->
+    ok;
+
+terminate(shutdown, Job) ->
+    ok;
+
+terminate({shutdown, _}, Job) ->
+    ok;
+
 terminate(Reason, Job) ->
-    couch_log:notice("~p terminate ~p ~p", [?MODULE, Reason, Job]),
+    couch_log:error("~p terminate ~p ~p", [?MODULE, Reason, Job]),
     ok.
 
 
@@ -121,7 +130,6 @@ handle_info(retry, #job{} = Job) ->
     handle_cast(do_state, Job);
 
 handle_info({'EXIT', Pid, Reason}, #job{workers = Workers} = Job) ->
-    couch_log:notice("~p EXIT pid:~p reason:~p", [?MODULE, Pid, Reason]),
     case lists:member(Pid, Workers) of
         true ->
             Job1 = Job#job{workers = Workers -- [Pid]},
@@ -296,13 +304,10 @@ report(#job{manager = ManagerPid} = Job) ->
 -spec worker_exited(any(), #job{}) ->
     {noreply, #job{}} | {stop, any()} | {retry, any(), #job{}}.
 worker_exited(normal, #job{split_state = State, workers = []} = Job) ->
-    couch_log:notice("~p LAST worker exited ~p", [?MODULE, jobfmt(Job)]),
+    couch_log:notice("~p last worker exited ~p", [?MODULE, jobfmt(Job)]),
     {noreply, switch_state(Job, next_state(State))};
 
 worker_exited(normal, #job{workers = Workers} = Job) when Workers =/= [] ->
-    WaitingOn = length(Workers),
-    Msg = "~p worker exited normal ~p, waiting on ~p more workers",
-    couch_log:debug(Msg, [?MODULE, jobfmt(Job), WaitingOn]),
     {noreply, Job};
 
 worker_exited({error, missing_source}, #job{} = Job) ->
diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl
index 4d0b4cc..607ecc6 100644
--- a/src/mem3/src/mem3_reshard_store.erl
+++ b/src/mem3/src/mem3_reshard_store.erl
@@ -154,7 +154,7 @@ update_doc(Db, DocId, Body) ->
     case store_state() of
         true ->
             {ok, _} = couch_db:update_doc(Db, Doc, []),
-            couch_log:notice("~p updated doc ~p ~p", [?MODULE, DocId, Body]),
+            couch_log:debug("~p updated doc ~p ~p", [?MODULE, DocId, Body]),
             {ok, _} = couch_db:ensure_full_commit(Db),
             ok;
         false ->
diff --git a/src/mem3/test/mem3_reshard_test.erl b/src/mem3/test/mem3_reshard_test.erl
new file mode 100644
index 0000000..2f8407c
--- /dev/null
+++ b/src/mem3/test/mem3_reshard_test.erl
@@ -0,0 +1,487 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_test).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/src/mem3_reshard.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl"). % for all_docs function
+
+-define(ID, <<"_id">>).
+
+setup() ->
+    {Db1, Db2} = {?tempdb(), ?tempdb()},
+    create_db(Db1, [{q, 1}, {n, 1}]),
+    PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}],
+    create_db(Db2, [{q, 1}, {n, 1}, {props, PartProps}]),
+    #{db1 => Db1, db => Db2}.
+
+
+teardown(#{} = Dbs) ->
+    mem3_reshard:reset_state(),
+    maps:map(fun(_, Db) -> delete_db(Db) end, Dbs),
+    meck:unload().
+
+
+start_couch() ->
+    test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]).
+
+
+stop_couch(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+
+mem3_reshard_db_test_() ->
+    {
+        "mem3 shard split db tests",
+        {
+            setup,
+            fun start_couch/0, fun stop_couch/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun split_one_shard/1,
+                    fun update_docs_before_topoff1/1
+                ]
+            }
+        }
+    }.
+
+
+% This is a basic test to check that shard splitting preserves documents, and
+% db meta props like revs limits and security.
+split_one_shard(#{db1 := Db}) ->
+    ?_test(begin
+        DocSpec = #{
+            docs => [0, 9],
+            delete => [5, 9],
+            mrview => [0, 0],
+            search => [0, 0],
+            geo => [0, 0],
+            local => [0, 0]
+        },
+        add_test_docs(Db, DocSpec),
+
+        % Save documents before the split
+        Docs0 = get_all_docs(Db),
+        Local0 = get_local_docs(Db),
+
+        % Set some custom metadata properties
+        set_revs_limit(Db, 942),
+        set_purge_infos_limit(Db, 943),
+        SecObj = {[{<<"foo">>, <<"bar">>}]},
+        set_security(Db, SecObj),
+
+        % DbInfo is saved after setting metadata bits
+        % as those could bump the update sequence
+        DbInfo0 = get_db_info(Db),
+
+        % Split the one shard
+        [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+        {ok, JobId} = mem3_reshard:start_split_job(Shard),
+        wait_state(JobId, completed),
+
+        % Perform some basic checks that the shard was split
+        ResultShards = lists:sort(mem3:local_shards(Db)),
+        ?assertEqual(2, length(ResultShards)),
+        [#shard{range = R1}, #shard{range = R2}] = ResultShards,
+        ?assertEqual([16#00000000, 16#7fffffff], R1),
+        ?assertEqual([16#80000000, 16#ffffffff], R2),
+
+        % Check metadata bits after the split
+        ?assertEqual(942, get_revs_limit(Db)),
+        ?assertEqual(943, get_purge_infos_limit(Db)),
+        ?assertEqual(SecObj, get_security(Db)),
+
+        DbInfo1 = get_db_info(Db),
+        Docs1 = get_all_docs(Db),
+        Local1 = get_local_docs(Db),
+
+        % When comparing db infos, ignore update sequences they won't be the
+        % same since they are more shards involved after the split
+        ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+        % Update seq prefix number is a sum of all shard update sequences
+        #{<<"update_seq">> := UpdateSeq0} = DbInfo0,
+        #{<<"update_seq">> := UpdateSeq1} = DbInfo1,
+        ?assertEqual(UpdateSeq0 * 2, UpdateSeq1),
+
+        % Finally compare that the documents are still there after the split
+        ?assertEqual(Docs0, Docs1),
+
+        % Don't forget about the local but don't include internal checkpoints
+        % as some of those are munged and transformed during the split
+        ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
+    end).
+
+
+% This test checks that document added while the shard is being split are not
+% lost. Topoff1 state happens before indices are built
+update_docs_before_topoff1(#{db1 := Db}) ->
+    ?_test(begin
+        add_test_docs(Db, #{docs => [0, 9]}),
+
+        intercept_state(topoff1),
+
+        [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+        {ok, JobId} = mem3_reshard:start_split_job(Shard),
+
+        receive {JobPid, topoff1} -> ok end,
+        add_test_docs(Db, #{docs => [10, 19], local => [0, 0]}),
+        Docs0 = get_all_docs(Db),
+        Local0 = get_local_docs(Db),
+        DbInfo0 = get_db_info(Db),
+        JobPid ! continue,
+
+        wait_state(JobId, completed),
+
+        % Perform some basic checks that the shard was split
+        ResultShards = lists:sort(mem3:local_shards(Db)),
+        ?assertEqual(2, length(ResultShards)),
+
+        DbInfo1 = get_db_info(Db),
+        Docs1 = get_all_docs(Db),
+        Local1 = get_local_docs(Db),
+
+        ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+        % Update sequence after initial copy with 10 docs would be 10 on each
+        % target shard (to match the source) and the total update sequence
+        % would have been 20. But then 10 more docs were added (3 might have
+        % ended up on one target and 7 on another) so the final update sequence
+        % would then be 20 + 10 = 30.
+        ?assertMatch(#{<<"update_seq">> := 30}, DbInfo1),
+
+        ?assertEqual(Docs0, Docs1),
+        ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
+    end).
+
+
+intercept_state(State) ->
+    TestPid = self(),
+    meck:new(mem3_reshard_job, [passthrough]),
+    meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+            case Job#job.split_state of
+                State ->
+                    TestPid ! {self(), State},
+                    receive
+                        continue -> meck:passthrough([Job]);
+                        cancel -> ok
+                    end;
+                _ ->
+                   meck:passthrough([Job])
+            end
+        end).
+
+
+cancel_intercept() ->
+     meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+         meck:passthrough([Job])
+     end).
+
+
+wait_state(JobId, State) ->
+    test_util:wait(fun() ->
+            case mem3_reshard:job(JobId) of
+                {ok, {Props}} ->
+                    case couch_util:get_value(job_state, Props) of
+                        State -> ok;
+                        _ -> timer:sleep(100), wait
+                    end;
+                {error, not_found} -> timer:sleep(100), wait
+            end
+    end, 30000).
+
+
+set_revs_limit(DbName, Limit) ->
+    with_proc(fun() -> fabric:set_revs_limit(DbName, Limit, [?ADMIN_CTX]) end).
+
+
+get_revs_limit(DbName) ->
+    with_proc(fun() -> fabric:get_revs_limit(DbName) end).
+
+
+get_purge_infos_limit(DbName) ->
+    with_proc(fun() -> fabric:get_purge_infos_limit(DbName) end).
+
+
+set_purge_infos_limit(DbName, Limit) ->
+    with_proc(fun() ->
+        fabric:set_purge_infos_limit(DbName, Limit, [?ADMIN_CTX])
+    end).
+
+
+set_security(DbName, SecObj) ->
+    with_proc(fun() -> fabric:set_security(DbName, SecObj) end).
+
+
+get_security(DbName) ->
+    with_proc(fun() -> fabric:get_security(DbName, [?ADMIN_CTX]) end).
+
+
+get_db_info(DbName) ->
+    with_proc(fun() ->
+        {ok, Info} = fabric:get_db_info(DbName),
+        Map = maps:with([
+            <<"db_name">>, <<"doc_count">>, <<"props">>,  <<"doc_del_count">>,
+            <<"update_seq">>, <<"purge_seq">>, <<"disk_format_version">>
+        ], to_map(Info)),
+        update_seq_to_num(Map)
+    end).
+
+
+get_group_info(DbName, DesignId) ->
+    with_proc(fun() ->
+        {ok, GInfo} = fabric:get_view_group_info(DbName, DesignId),
+        Map = maps:with([
+            <<"language">>, <<"purge_seq">>, <<"signature">>, <<"update_seq">>
+        ], to_map(GInfo)),
+        update_seq_to_num(Map)
+    end).
+
+
+get_partition_info(DbName, Partition) ->
+    with_proc(fun() ->
+        {ok, PInfo} = fabric:get_partition_info(DbName, Partition),
+        maps:with([
+            <<"db_name">>, <<"doc_count">>, <<"doc_del_count">>, <<"partition">>
+        ], to_map(PInfo))
+    end).
+
+
+get_all_docs(DbName) ->
+    get_all_docs(DbName, #mrargs{}).
+
+
+get_all_docs(DbName, #mrargs{} = QArgs0) ->
+    GL = erlang:group_leader(),
+    with_proc(fun() ->
+        Cb = fun
+            ({row, Props}, Acc) ->
+                Doc = to_map(couch_util:get_value(doc, Props)),
+                #{?ID := Id} = Doc,
+                {ok, Acc#{Id => Doc}};
+            ({meta, _}, Acc) -> {ok, Acc};
+            (complete, Acc) -> {ok, Acc}
+        end,
+        QArgs = QArgs0#mrargs{include_docs = true},
+        {ok, Docs} = fabric:all_docs(DbName, Cb, #{}, QArgs),
+        Docs
+    end, GL).
+
+
+get_local_docs(DbName) ->
+    LocalNS = {namespace, <<"_local">>},
+    maps:map(fun(_, Doc) ->
+        maps:without([<<"_rev">>], Doc)
+    end, get_all_docs(DbName, #mrargs{extra = [LocalNS]})).
+
+
+without_seqs(#{} = InfoMap) ->
+    maps:without([<<"update_seq">>, <<"purge_seq">>], InfoMap).
+
+
+without_meta_locals(#{} = Local) ->
+    maps:filter(fun
+        (<<"_local/purge-mrview-", _/binary>>, _) -> false;
+        (<<"_local/shard-sync-", _/binary>>, _) -> false;
+        (_, _) -> true
+   end, Local).
+
+
+update_seq_to_num(#{} = InfoMap) ->
+    maps:map(fun
+        (<<"update_seq">>, Seq) -> seq_to_num(Seq);
+        (<<"purge_seq">>, PSeq) -> seq_to_num(PSeq);
+        (_, V) -> V
+    end, InfoMap).
+
+
+seq_to_num(Seq) ->
+    [SeqNum, _] = binary:split(Seq, <<"-">>),
+    binary_to_integer(SeqNum).
+
+
+to_map([_ | _] = Props) ->
+    to_map({Props});
+
+to_map({[_ | _]} = EJson) ->
+     jiffy:decode(jiffy:encode(EJson), [return_maps]).
+
+
+create_db(DbName, Opts) ->
+    GL = erlang:group_leader(),
+    with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL).
+
+
+delete_db(DbName) ->
+    GL = erlang:group_leader(),
+    with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL).
+
+
+with_proc(Fun) ->
+    with_proc(Fun, undefined, 30000).
+
+
+with_proc(Fun, GroupLeader) ->
+    with_proc(Fun, GroupLeader, 30000).
+
+
+with_proc(Fun, GroupLeader, Timeout) ->
+    {Pid, Ref} = spawn_monitor(fun() ->
+        case GroupLeader of
+            undefined -> ok;
+            _ -> erlang:group_leader(GroupLeader, self())
+        end,
+        exit({with_proc_res, Fun()})
+    end),
+    receive
+        {'DOWN', Ref, process, Pid, {with_proc_res, Res}} ->
+            Res;
+        {'DOWN', Ref, process, Pid, Error} ->
+            error(Error)
+    after Timeout ->
+        erlang:demonitor(Ref, [flush]),
+        exit(Pid, kill),
+        error({with_proc_timeout, Fun, Timeout})
+   end.
+
+
+add_test_docs(DbName, #{} = DocSpec) ->
+    Docs = docs(maps:get(docs, DocSpec, []))
+        ++ ddocs(mrview, maps:get(mrview, DocSpec, []))
+        ++ ddocs(search, maps:get(search, DocSpec, []))
+        ++ ddocs(geo, maps:get(geo, DocSpec, []))
+        ++ ldocs(maps:get(local, DocSpec, [])),
+    Res = update_docs(DbName, Docs),
+    Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) ->
+        Doc#doc{revs = {RevPos, [Rev]}}
+    end, lists:zip(Docs, Res)),
+    case delete_docs(maps:get(delete, DocSpec, []), Docs1) of
+        [] -> ok;
+        [_ | _] = Deleted -> update_docs(DbName, Deleted)
+    end,
+    ok.
+
+
+update_docs(DbName, Docs) ->
+    with_proc(fun() ->
+        case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of
+            {accepted, Res} -> Res;
+            {ok, Res} -> Res
+        end
+    end).
+
+
+delete_docs([S, E], Docs) when E >= S ->
+    ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)],
+    lists:filtermap(fun(#doc{id = Id} = Doc) ->
+        case lists:member(Id, ToDelete) of
+            true -> {true, Doc#doc{deleted = true}};
+            false -> false
+        end
+    end, Docs);
+delete_docs(_, _) ->
+    [].
+
+
+docs([S, E]) when E >= S ->
+    [doc(<<"">>, I) || I <- lists:seq(S, E)];
+docs(_) ->
+    [].
+
+
+ddocs(Type, [S, E]) when E >= S ->
+    Body = ddprop(Type),
+    BType = atom_to_binary(Type, utf8),
+    [doc(<<"_design/", BType/binary>>, I, Body, 0) || I <- lists:seq(S, E)];
+ddocs(_, _) ->
+    [].
+
+
+ldocs([S, E]) when E >= S ->
+    [doc(<<"_local/">>, I, bodyprops(), 0) || I <- lists:seq(S, E)];
+ldocs(_) ->
+    [].
+
+
+
+doc(Pref, Id) ->
+    Body = bodyprops(),
+    doc(Pref, Id, Body, 42).
+
+
+doc(Pref, Id, BodyProps, AttSize) ->
+    #doc{
+        id = doc_id(Pref, Id),
+        body = {BodyProps},
+        atts = atts(AttSize)
+    }.
+
+
+doc_id(Pref, Id) ->
+    IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])),
+    <<Pref/binary, IdBin/binary>>.
+
+
+ddprop(mrview) ->
+    [
+        {<<"views">>, {[
+             {<<"v1">>, {[
+                 {<<"map">>, <<"function(d){emit(d);}">>}
+             ]}}
+        ]}}
+    ];
+
+ddprop(geo) ->
+    [
+        {<<"st_indexes">>, {[
+            {<<"area">>, {[
+                {<<"analyzer">>, <<"standard">>},
+                {<<"index">>, <<"function(d){if(d.g){st_index(d.g)}}">> }
+            ]}}
+        ]}}
+    ];
+
+ddprop(search) ->
+    [
+        {<<"indexes">>, {[
+            {<<"types">>, {[
+                {<<"index">>, <<"function(d){if(d.g){st_index(d.g.type)}}">>}
+            ]}}
+        ]}}
+   ].
+
+
+bodyprops() ->
+    [
+        {<<"g">>, {[
+            {<<"type">>, <<"Polygon">>},
+            {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]}
+        ]}}
+   ].
+
+
+atts(0) ->
+    [];
+
+atts(Size) when is_integer(Size), Size >= 1 ->
+    Data = << <<"x">> || _ <- lists:seq(1, Size) >>,
+    [couch_att:new([
+        {name, <<"att">>},
+        {type, <<"app/binary">>},
+        {att_len, Size},
+        {data, Data}
+    ])].