You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/02/22 13:39:16 UTC
[couchdb] branch shard-split updated: Add more shard splitting tests
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch shard-split
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/shard-split by this push:
new f26e6f8 Add more shard splitting tests
f26e6f8 is described below
commit f26e6f86f0ba3bbf71d164b72ff014e6810b3a12
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Feb 22 08:35:34 2019 -0500
Add more shard splitting tests
Check shard splitting jobs preserve db data and metadata properly
Also reduce logging level in a few places.
---
src/mem3/src/mem3_reshard.erl | 5 +-
src/mem3/src/mem3_reshard_job.erl | 17 +-
src/mem3/src/mem3_reshard_store.erl | 2 +-
src/mem3/test/mem3_reshard_test.erl | 487 ++++++++++++++++++++++++++++++++++++
4 files changed, 501 insertions(+), 10 deletions(-)
diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl
index 7cd510c..b2ca436 100644
--- a/src/mem3/src/mem3_reshard.erl
+++ b/src/mem3/src/mem3_reshard.erl
@@ -130,7 +130,6 @@ job(JobId) ->
-spec report(pid(), #job{}) -> ok.
report(Server, #job{} = Job) when is_pid(Server) ->
- couch_log:notice("~p reporting ~p ~p", [?MODULE, Server, jobfmt(Job)]),
gen_server:cast(Server, {report, Job}).
@@ -650,7 +649,7 @@ info_delete(Key, StateInfo) ->
-spec checkpoint_int(#job{}, #state{}) -> #state{}.
checkpoint_int(#job{} = Job, State) ->
- couch_log:notice("~p checkpoint ~s", [?MODULE, jobfmt(Job)]),
+ couch_log:debug("~p checkpoint ~s", [?MODULE, jobfmt(Job)]),
case report_int(Job) of
ok ->
ok = mem3_reshard_store:store_job(State, Job),
@@ -668,7 +667,7 @@ report_int(Job) ->
[#job{ref = Ref, pid = OldPid}] ->
case Job#job.pid =:= OldPid of
true ->
- couch_log:notice("~p reported ~s", [?MODULE, jobfmt(Job)]),
+ couch_log:debug("~p reported ~s", [?MODULE, jobfmt(Job)]),
% Carry over the reference from ets as the #job{} coming
% from the job process won't have it's own monitor ref.
true = ets:insert(?MODULE, Job#job{ref = Ref}),
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
index 40be94e..00eafdf 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -84,8 +84,17 @@ init([#job{} = Job0]) ->
{ok, Job}.
+terminate(normal, Job) ->
+ ok;
+
+terminate(shutdown, Job) ->
+ ok;
+
+terminate({shutdown, _}, Job) ->
+ ok;
+
terminate(Reason, Job) ->
- couch_log:notice("~p terminate ~p ~p", [?MODULE, Reason, Job]),
+ couch_log:error("~p terminate ~p ~p", [?MODULE, Reason, Job]),
ok.
@@ -121,7 +130,6 @@ handle_info(retry, #job{} = Job) ->
handle_cast(do_state, Job);
handle_info({'EXIT', Pid, Reason}, #job{workers = Workers} = Job) ->
- couch_log:notice("~p EXIT pid:~p reason:~p", [?MODULE, Pid, Reason]),
case lists:member(Pid, Workers) of
true ->
Job1 = Job#job{workers = Workers -- [Pid]},
@@ -296,13 +304,10 @@ report(#job{manager = ManagerPid} = Job) ->
-spec worker_exited(any(), #job{}) ->
{noreply, #job{}} | {stop, any()} | {retry, any(), #job{}}.
worker_exited(normal, #job{split_state = State, workers = []} = Job) ->
- couch_log:notice("~p LAST worker exited ~p", [?MODULE, jobfmt(Job)]),
+ couch_log:notice("~p last worker exited ~p", [?MODULE, jobfmt(Job)]),
{noreply, switch_state(Job, next_state(State))};
worker_exited(normal, #job{workers = Workers} = Job) when Workers =/= [] ->
- WaitingOn = length(Workers),
- Msg = "~p worker exited normal ~p, waiting on ~p more workers",
- couch_log:debug(Msg, [?MODULE, jobfmt(Job), WaitingOn]),
{noreply, Job};
worker_exited({error, missing_source}, #job{} = Job) ->
diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl
index 4d0b4cc..607ecc6 100644
--- a/src/mem3/src/mem3_reshard_store.erl
+++ b/src/mem3/src/mem3_reshard_store.erl
@@ -154,7 +154,7 @@ update_doc(Db, DocId, Body) ->
case store_state() of
true ->
{ok, _} = couch_db:update_doc(Db, Doc, []),
- couch_log:notice("~p updated doc ~p ~p", [?MODULE, DocId, Body]),
+ couch_log:debug("~p updated doc ~p ~p", [?MODULE, DocId, Body]),
{ok, _} = couch_db:ensure_full_commit(Db),
ok;
false ->
diff --git a/src/mem3/test/mem3_reshard_test.erl b/src/mem3/test/mem3_reshard_test.erl
new file mode 100644
index 0000000..2f8407c
--- /dev/null
+++ b/src/mem3/test/mem3_reshard_test.erl
@@ -0,0 +1,487 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_test).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/src/mem3_reshard.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl"). % for all_docs function
+
+-define(ID, <<"_id">>).
+
+setup() ->
+ {Db1, Db2} = {?tempdb(), ?tempdb()},
+ create_db(Db1, [{q, 1}, {n, 1}]),
+ PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}],
+ create_db(Db2, [{q, 1}, {n, 1}, {props, PartProps}]),
+ #{db1 => Db1, db => Db2}.
+
+
+teardown(#{} = Dbs) ->
+ mem3_reshard:reset_state(),
+ maps:map(fun(_, Db) -> delete_db(Db) end, Dbs),
+ meck:unload().
+
+
+start_couch() ->
+ test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]).
+
+
+stop_couch(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+mem3_reshard_db_test_() ->
+ {
+ "mem3 shard split db tests",
+ {
+ setup,
+ fun start_couch/0, fun stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun split_one_shard/1,
+ fun update_docs_before_topoff1/1
+ ]
+ }
+ }
+ }.
+
+
+% This is a basic test to check that shard splitting preserves documents, and
+% db meta props like revs limits and security.
+split_one_shard(#{db1 := Db}) ->
+ ?_test(begin
+ DocSpec = #{
+ docs => [0, 9],
+ delete => [5, 9],
+ mrview => [0, 0],
+ search => [0, 0],
+ geo => [0, 0],
+ local => [0, 0]
+ },
+ add_test_docs(Db, DocSpec),
+
+ % Save documents before the split
+ Docs0 = get_all_docs(Db),
+ Local0 = get_local_docs(Db),
+
+ % Set some custom metadata properties
+ set_revs_limit(Db, 942),
+ set_purge_infos_limit(Db, 943),
+ SecObj = {[{<<"foo">>, <<"bar">>}]},
+ set_security(Db, SecObj),
+
+ % DbInfo is saved after setting metadata bits
+ % as those could bump the update sequence
+ DbInfo0 = get_db_info(Db),
+
+ % Split the one shard
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+
+ % Perform some basic checks that the shard was split
+ ResultShards = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(ResultShards)),
+ [#shard{range = R1}, #shard{range = R2}] = ResultShards,
+ ?assertEqual([16#00000000, 16#7fffffff], R1),
+ ?assertEqual([16#80000000, 16#ffffffff], R2),
+
+ % Check metadata bits after the split
+ ?assertEqual(942, get_revs_limit(Db)),
+ ?assertEqual(943, get_purge_infos_limit(Db)),
+ ?assertEqual(SecObj, get_security(Db)),
+
+ DbInfo1 = get_db_info(Db),
+ Docs1 = get_all_docs(Db),
+ Local1 = get_local_docs(Db),
+
+ % When comparing db infos, ignore update sequences they won't be the
+ % same since they are more shards involved after the split
+ ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+ % Update seq prefix number is a sum of all shard update sequences
+ #{<<"update_seq">> := UpdateSeq0} = DbInfo0,
+ #{<<"update_seq">> := UpdateSeq1} = DbInfo1,
+ ?assertEqual(UpdateSeq0 * 2, UpdateSeq1),
+
+ % Finally compare that the documents are still there after the split
+ ?assertEqual(Docs0, Docs1),
+
+ % Don't forget about the local but don't include internal checkpoints
+ % as some of those are munged and transformed during the split
+ ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
+ end).
+
+
+% This test checks that document added while the shard is being split are not
+% lost. Topoff1 state happens before indices are built
+update_docs_before_topoff1(#{db1 := Db}) ->
+ ?_test(begin
+ add_test_docs(Db, #{docs => [0, 9]}),
+
+ intercept_state(topoff1),
+
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+
+ receive {JobPid, topoff1} -> ok end,
+ add_test_docs(Db, #{docs => [10, 19], local => [0, 0]}),
+ Docs0 = get_all_docs(Db),
+ Local0 = get_local_docs(Db),
+ DbInfo0 = get_db_info(Db),
+ JobPid ! continue,
+
+ wait_state(JobId, completed),
+
+ % Perform some basic checks that the shard was split
+ ResultShards = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(ResultShards)),
+
+ DbInfo1 = get_db_info(Db),
+ Docs1 = get_all_docs(Db),
+ Local1 = get_local_docs(Db),
+
+ ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+ % Update sequence after initial copy with 10 docs would be 10 on each
+ % target shard (to match the source) and the total update sequence
+ % would have been 20. But then 10 more docs were added (3 might have
+ % ended up on one target and 7 on another) so the final update sequence
+ % would then be 20 + 10 = 30.
+ ?assertMatch(#{<<"update_seq">> := 30}, DbInfo1),
+
+ ?assertEqual(Docs0, Docs1),
+ ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
+ end).
+
+
+intercept_state(State) ->
+ TestPid = self(),
+ meck:new(mem3_reshard_job, [passthrough]),
+ meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+ case Job#job.split_state of
+ State ->
+ TestPid ! {self(), State},
+ receive
+ continue -> meck:passthrough([Job]);
+ cancel -> ok
+ end;
+ _ ->
+ meck:passthrough([Job])
+ end
+ end).
+
+
+cancel_intercept() ->
+ meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+ meck:passthrough([Job])
+ end).
+
+
+wait_state(JobId, State) ->
+ test_util:wait(fun() ->
+ case mem3_reshard:job(JobId) of
+ {ok, {Props}} ->
+ case couch_util:get_value(job_state, Props) of
+ State -> ok;
+ _ -> timer:sleep(100), wait
+ end;
+ {error, not_found} -> timer:sleep(100), wait
+ end
+ end, 30000).
+
+
+set_revs_limit(DbName, Limit) ->
+ with_proc(fun() -> fabric:set_revs_limit(DbName, Limit, [?ADMIN_CTX]) end).
+
+
+get_revs_limit(DbName) ->
+ with_proc(fun() -> fabric:get_revs_limit(DbName) end).
+
+
+get_purge_infos_limit(DbName) ->
+ with_proc(fun() -> fabric:get_purge_infos_limit(DbName) end).
+
+
+set_purge_infos_limit(DbName, Limit) ->
+ with_proc(fun() ->
+ fabric:set_purge_infos_limit(DbName, Limit, [?ADMIN_CTX])
+ end).
+
+
+set_security(DbName, SecObj) ->
+ with_proc(fun() -> fabric:set_security(DbName, SecObj) end).
+
+
+get_security(DbName) ->
+ with_proc(fun() -> fabric:get_security(DbName, [?ADMIN_CTX]) end).
+
+
+get_db_info(DbName) ->
+ with_proc(fun() ->
+ {ok, Info} = fabric:get_db_info(DbName),
+ Map = maps:with([
+ <<"db_name">>, <<"doc_count">>, <<"props">>, <<"doc_del_count">>,
+ <<"update_seq">>, <<"purge_seq">>, <<"disk_format_version">>
+ ], to_map(Info)),
+ update_seq_to_num(Map)
+ end).
+
+
+get_group_info(DbName, DesignId) ->
+ with_proc(fun() ->
+ {ok, GInfo} = fabric:get_view_group_info(DbName, DesignId),
+ Map = maps:with([
+ <<"language">>, <<"purge_seq">>, <<"signature">>, <<"update_seq">>
+ ], to_map(GInfo)),
+ update_seq_to_num(Map)
+ end).
+
+
+get_partition_info(DbName, Partition) ->
+ with_proc(fun() ->
+ {ok, PInfo} = fabric:get_partition_info(DbName, Partition),
+ maps:with([
+ <<"db_name">>, <<"doc_count">>, <<"doc_del_count">>, <<"partition">>
+ ], to_map(PInfo))
+ end).
+
+
+get_all_docs(DbName) ->
+ get_all_docs(DbName, #mrargs{}).
+
+
+get_all_docs(DbName, #mrargs{} = QArgs0) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() ->
+ Cb = fun
+ ({row, Props}, Acc) ->
+ Doc = to_map(couch_util:get_value(doc, Props)),
+ #{?ID := Id} = Doc,
+ {ok, Acc#{Id => Doc}};
+ ({meta, _}, Acc) -> {ok, Acc};
+ (complete, Acc) -> {ok, Acc}
+ end,
+ QArgs = QArgs0#mrargs{include_docs = true},
+ {ok, Docs} = fabric:all_docs(DbName, Cb, #{}, QArgs),
+ Docs
+ end, GL).
+
+
+get_local_docs(DbName) ->
+ LocalNS = {namespace, <<"_local">>},
+ maps:map(fun(_, Doc) ->
+ maps:without([<<"_rev">>], Doc)
+ end, get_all_docs(DbName, #mrargs{extra = [LocalNS]})).
+
+
+without_seqs(#{} = InfoMap) ->
+ maps:without([<<"update_seq">>, <<"purge_seq">>], InfoMap).
+
+
+without_meta_locals(#{} = Local) ->
+ maps:filter(fun
+ (<<"_local/purge-mrview-", _/binary>>, _) -> false;
+ (<<"_local/shard-sync-", _/binary>>, _) -> false;
+ (_, _) -> true
+ end, Local).
+
+
+update_seq_to_num(#{} = InfoMap) ->
+ maps:map(fun
+ (<<"update_seq">>, Seq) -> seq_to_num(Seq);
+ (<<"purge_seq">>, PSeq) -> seq_to_num(PSeq);
+ (_, V) -> V
+ end, InfoMap).
+
+
+seq_to_num(Seq) ->
+ [SeqNum, _] = binary:split(Seq, <<"-">>),
+ binary_to_integer(SeqNum).
+
+
+to_map([_ | _] = Props) ->
+ to_map({Props});
+
+to_map({[_ | _]} = EJson) ->
+ jiffy:decode(jiffy:encode(EJson), [return_maps]).
+
+
+create_db(DbName, Opts) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL).
+
+
+delete_db(DbName) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL).
+
+
+with_proc(Fun) ->
+ with_proc(Fun, undefined, 30000).
+
+
+with_proc(Fun, GroupLeader) ->
+ with_proc(Fun, GroupLeader, 30000).
+
+
+with_proc(Fun, GroupLeader, Timeout) ->
+ {Pid, Ref} = spawn_monitor(fun() ->
+ case GroupLeader of
+ undefined -> ok;
+ _ -> erlang:group_leader(GroupLeader, self())
+ end,
+ exit({with_proc_res, Fun()})
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, {with_proc_res, Res}} ->
+ Res;
+ {'DOWN', Ref, process, Pid, Error} ->
+ error(Error)
+ after Timeout ->
+ erlang:demonitor(Ref, [flush]),
+ exit(Pid, kill),
+ error({with_proc_timeout, Fun, Timeout})
+ end.
+
+
+add_test_docs(DbName, #{} = DocSpec) ->
+ Docs = docs(maps:get(docs, DocSpec, []))
+ ++ ddocs(mrview, maps:get(mrview, DocSpec, []))
+ ++ ddocs(search, maps:get(search, DocSpec, []))
+ ++ ddocs(geo, maps:get(geo, DocSpec, []))
+ ++ ldocs(maps:get(local, DocSpec, [])),
+ Res = update_docs(DbName, Docs),
+ Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) ->
+ Doc#doc{revs = {RevPos, [Rev]}}
+ end, lists:zip(Docs, Res)),
+ case delete_docs(maps:get(delete, DocSpec, []), Docs1) of
+ [] -> ok;
+ [_ | _] = Deleted -> update_docs(DbName, Deleted)
+ end,
+ ok.
+
+
+update_docs(DbName, Docs) ->
+ with_proc(fun() ->
+ case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of
+ {accepted, Res} -> Res;
+ {ok, Res} -> Res
+ end
+ end).
+
+
+delete_docs([S, E], Docs) when E >= S ->
+ ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)],
+ lists:filtermap(fun(#doc{id = Id} = Doc) ->
+ case lists:member(Id, ToDelete) of
+ true -> {true, Doc#doc{deleted = true}};
+ false -> false
+ end
+ end, Docs);
+delete_docs(_, _) ->
+ [].
+
+
+docs([S, E]) when E >= S ->
+ [doc(<<"">>, I) || I <- lists:seq(S, E)];
+docs(_) ->
+ [].
+
+
+ddocs(Type, [S, E]) when E >= S ->
+ Body = ddprop(Type),
+ BType = atom_to_binary(Type, utf8),
+ [doc(<<"_design/", BType/binary>>, I, Body, 0) || I <- lists:seq(S, E)];
+ddocs(_, _) ->
+ [].
+
+
+ldocs([S, E]) when E >= S ->
+ [doc(<<"_local/">>, I, bodyprops(), 0) || I <- lists:seq(S, E)];
+ldocs(_) ->
+ [].
+
+
+
+doc(Pref, Id) ->
+ Body = bodyprops(),
+ doc(Pref, Id, Body, 42).
+
+
+doc(Pref, Id, BodyProps, AttSize) ->
+ #doc{
+ id = doc_id(Pref, Id),
+ body = {BodyProps},
+ atts = atts(AttSize)
+ }.
+
+
+doc_id(Pref, Id) ->
+ IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])),
+ <<Pref/binary, IdBin/binary>>.
+
+
+ddprop(mrview) ->
+ [
+ {<<"views">>, {[
+ {<<"v1">>, {[
+ {<<"map">>, <<"function(d){emit(d);}">>}
+ ]}}
+ ]}}
+ ];
+
+ddprop(geo) ->
+ [
+ {<<"st_indexes">>, {[
+ {<<"area">>, {[
+ {<<"analyzer">>, <<"standard">>},
+ {<<"index">>, <<"function(d){if(d.g){st_index(d.g)}}">> }
+ ]}}
+ ]}}
+ ];
+
+ddprop(search) ->
+ [
+ {<<"indexes">>, {[
+ {<<"types">>, {[
+ {<<"index">>, <<"function(d){if(d.g){st_index(d.g.type)}}">>}
+ ]}}
+ ]}}
+ ].
+
+
+bodyprops() ->
+ [
+ {<<"g">>, {[
+ {<<"type">>, <<"Polygon">>},
+ {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]}
+ ]}}
+ ].
+
+
+atts(0) ->
+ [];
+
+atts(Size) when is_integer(Size), Size >= 1 ->
+ Data = << <<"x">> || _ <- lists:seq(1, Size) >>,
+ [couch_att:new([
+ {name, <<"att">>},
+ {type, <<"app/binary">>},
+ {att_len, Size},
+ {data, Data}
+ ])].