You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/02/07 15:34:34 UTC
[couchdb] branch shard-split updated: Expand API tests to tests job
lifecycle
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch shard-split
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/shard-split by this push:
new 709b615 Expand API tests to tests job lifecycle
709b615 is described below
commit 709b615aafd089138310bba6892a04ee88edb9af
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Thu Feb 7 10:31:45 2019 -0500
Expand API tests to tests job lifecycle
Also fix associated bugs
---
src/mem3/src/mem3_reshard.erl | 49 ++++++-----
src/mem3/src/mem3_reshard_debug.erl | 31 +++++--
src/mem3/src/mem3_reshard_job.erl | 4 +-
src/mem3/test/mem3_reshard_api_test.erl | 140 +++++++++++++++++++++++++++-----
4 files changed, 175 insertions(+), 49 deletions(-)
diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl
index 7e390ec..4ba0e6d 100644
--- a/src/mem3/src/mem3_reshard.erl
+++ b/src/mem3/src/mem3_reshard.erl
@@ -212,7 +212,7 @@ handle_call({stop, Reason}, _From, #state{state = running} = State) ->
state_info = info_update(reason, Reason, State#state.state_info)
},
ok = mem3_reshard_store:store_state(State1),
- [kill_job_int(Job) || Job <- running_jobs()],
+ [temporarily_stop_job(Job) || Job <- running_jobs()],
{reply, ok, State1};
handle_call({stop, _}, _From, State) ->
@@ -273,7 +273,8 @@ handle_call({resume_job, Id}, _From, State) ->
handle_call({stop_job, Id, Reason}, _From, State) ->
couch_log:notice("~p stop_job Id:~p Reason:~p", [?MODULE, Id, Reason]),
case job_by_id(Id) of
- #job{job_state = running} = Job ->
+ #job{job_state = JSt} = Job when JSt =:= running orelse JSt =:= new
+ orelse JSt =:= stopped ->
ok = stop_job_int(Job, stopped, Reason, State),
{reply, ok, State};
#job{} ->
@@ -371,11 +372,12 @@ code_change(_OldVsn, State, _Extra) ->
% Insert job in the ets table as a temporarily stopped job. This would happen
-% then job is reload or added when cluster-wide resharding is stopped.
+% then job is reloaded or added when cluster-wide resharding is stopped.
-spec temporarily_stop_job(#job{}) -> #job{}.
temporarily_stop_job(Job) ->
- OldInfo = Job#job.state_info,
- Job1 = Job#job{
+ Job1 = kill_job_int(Job),
+ OldInfo = Job1#job.state_info,
+ Job2 = Job1#job{
job_state = stopped,
update_time = now_sec(),
start_time = 0,
@@ -383,8 +385,9 @@ temporarily_stop_job(Job) ->
pid = undefined,
ref = undefined
},
- true = ets:insert(?MODULE, update_job_history(Job1)),
- Job1.
+ Job3 = update_job_history(Job2),
+ true = ets:insert(?MODULE, Job3),
+ Job3.
-spec reload_jobs(#state{}) -> #state{}.
@@ -414,10 +417,10 @@ reload_job(#job{job_state = JS} = Job, #state{state = running} = State)
State
end;
-% The default case is to just load the job into the ets table. This would be
-% failed or completed jobs for example
+% If job is disabled individually (stopped by the user), is completed or failed
+% then simply load it into the ets table
reload_job(#job{job_state = JS} = Job, #state{} = State)
- when JS =:= failed orelse JS =:= completed ->
+ when JS =:= failed orelse JS =:= completed orelse JS =:= stopped ->
true = ets:insert(?MODULE, Job),
State.
@@ -460,9 +463,6 @@ spawn_job(#job{} = Job0) ->
-spec stop_job_int(#job{}, job_state(), term(), #state{}) -> ok.
-stop_job_int(#job{pid = undefined}, _JobState, _Reason, _State) ->
- ok;
-
stop_job_int(#job{} = Job, JobState, Reason, State) ->
couch_log:info("~p stop_job_int ~p newstate: ~p reason:~p", [?MODULE,
jobfmt(Job), JobState, Reason]),
@@ -473,6 +473,7 @@ stop_job_int(#job{} = Job, JobState, Reason, State) ->
state_info = [{reason, Reason}]
},
ok = mem3_reshard_store:store_job(State, Job2),
+ true = ets:insert(?MODULE, Job2),
couch_log:info("~p stop_job_int stopped ~p", [?MODULE, jobfmt(Job2)]),
ok.
@@ -652,7 +653,7 @@ jobs_by_dbname(Name) ->
-spec running_jobs() -> [#job{}].
running_jobs() ->
Pat = #job{job_state = running, _ = '_'},
- [Job || [Job] <- ets:match_object(?MODULE, Pat)].
+ ets:match_object(?MODULE, Pat).
-spec fail_jobs_for_deleted_sources([#job{}], #state{}) -> ok.
@@ -690,13 +691,19 @@ checkpoint_int(#job{} = Job, State) ->
-spec report_int(#job{}) -> ok | not_found.
report_int(Job) ->
case ets:lookup(?MODULE, Job#job.id) of
- [#job{ref = Ref}] ->
- couch_log:notice("~p reported ~s", [?MODULE, jobfmt(Job)]),
- % We care over the reference used to monitor this job. The job
- % record coming in from the job itself won't have and if we just
- % ets:insert it we'd end up forgetting the old ref
- true = ets:insert(?MODULE, Job#job{ref = Ref}),
- ok;
+ [#job{ref = Ref, pid = OldPid}] ->
+ case Job#job.pid =:= OldPid of
+ true ->
+ couch_log:notice("~p reported ~s", [?MODULE, jobfmt(Job)]),
+ % Carry over the reference from ets as the #job{} coming
+ % from the job process won't have it's own monitor ref.
+ true = ets:insert(?MODULE, Job#job{ref = Ref}),
+ ok;
+ false ->
+ LogMsg = "~p ignoring old job report ~p new job:~p",
+ couch_log:warning(LogMsg, [?MODULE, jobfmt(Job), OldPid]),
+ not_found
+ end;
_ ->
couch_log:error("~p reporting : couldn't find ~p", [?MODULE, Job]),
not_found
diff --git a/src/mem3/src/mem3_reshard_debug.erl b/src/mem3/src/mem3_reshard_debug.erl
index b276391..15bb6d2 100644
--- a/src/mem3/src/mem3_reshard_debug.erl
+++ b/src/mem3/src/mem3_reshard_debug.erl
@@ -19,6 +19,9 @@
-export([
+ state_intercept_init/0,
+ state_intercept_cleanup/0,
+ state_intercept/2,
start_test_job/0,
start_test_job/1,
create_test_db/1,
@@ -102,18 +105,34 @@ all_docs(DbName) ->
fabric:all_docs(DbName, Cb, ok, #mrargs{}).
+
+state_intercept_tid() ->
+ list_to_atom("mem3_reshard_" ++ erlang:pid_to_list(self())).
+
+
+state_intercept_cleanup() ->
+ meck:unload(),
+ catch ets:delete(state_intercept_tid()),
+ ok.
+
+
state_intercept(State, Fun) ->
- ets:insert(?MODULE, {State, Fun}).
+ Tid = state_intercept_tid(),
+ case ets:info(Tid) of
+ undefined -> state_intercept_init();
+ [_ | _] -> ok
+ end,
+ ets:insert(Tid, {State, Fun}).
+
state_intercept_init() ->
- meck:unload(),
- catch ets:delete(?MODULE),
- ets:new(?MODULE, [named_table, public, set]),
- meck:new(mem3_reshard, [passthrough]),
+ Tid = state_intercept_tid(),
+ ets:new(Tid, [named_table, public, set]),
+ catch meck:new(mem3_reshard, [passthrough]),
GL = erlang:group_leader(),
meck:expect(mem3_reshard, checkpoint,
fun(Server, #job{} = Job) ->
- case ets:lookup(?MODULE, Job#job.split_state) of
+ case ets:lookup(Tid, Job#job.split_state) of
[{_, Fun}] ->
{_, Ref} = spawn_monitor(fun() ->
erlang:group_leader(GL, self()),
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
index 41a33c4..f22720f 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -66,8 +66,8 @@ jobfmt(#job{} = Job) ->
pid = Pid
} = Job,
TargetCount = length(Targets),
- Msg = "#job{~s ~s /~B job:~s state:~s pid:~p}",
- Fmt = io_lib:format(Msg, [Id, Source, TargetCount, State, JobState, Pid]),
+ Msg = "#job{~s ~s /~B job_state:~s split_state:~s pid:~p}",
+ Fmt = io_lib:format(Msg, [Id, Source, TargetCount, JobState, State, Pid]),
lists:flatten(Fmt).
diff --git a/src/mem3/test/mem3_reshard_api_test.erl b/src/mem3/test/mem3_reshard_api_test.erl
index a1055b4..2d5cf8f 100644
--- a/src/mem3/test/mem3_reshard_api_test.erl
+++ b/src/mem3/test/mem3_reshard_api_test.erl
@@ -15,6 +15,7 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/src/mem3_reshard.hrl").
-define(USER, "mem3_reshard_api_test_admin").
@@ -27,6 +28,7 @@
-define(RESHARD, "_reshard/").
-define(JOBS, "_reshard/jobs/").
-define(STATE, "_reshard/state").
+-define(ID, <<"id">>).
setup() ->
@@ -39,11 +41,14 @@ setup() ->
create_db(Url ++ ?DB1 ++ "?q=1&n=1"),
create_db(Url ++ ?DB2 ++ "?q=1&n=1"),
create_db(Url ++ ?DB3 ++ "?q=2&n=1"),
+ %mem3_reshard_debug:state_intercept_init(),
Url.
teardown(Url) ->
+ meck:unload(),
mem3_reshard:reset_state(),
+ %mem3_reshard_debug:state_intercept_cleanup(),
delete_db(Url ++ ?DB1),
delete_db(Url ++ ?DB2),
delete_db(Url ++ ?DB3),
@@ -68,18 +73,20 @@ mem3_reshard_api_test_() ->
foreach,
fun setup/0, fun teardown/1,
[
- fun reshard_basics/1,
- fun reshard_create_job_basic/1,
- fun reshard_create_two_jobs/1,
- fun reshard_start_stop_cluster_basic/1,
- fun reshard_start_stop_cluster_with_a_job/1
+ fun basics/1,
+ fun create_job_basic/1,
+ fun create_two_jobs/1,
+ fun start_stop_cluster_basic/1,
+ fun start_stop_cluster_with_a_job/1,
+ fun individual_job_start_stop/1,
+ fun individual_job_stop_when_cluster_stopped/1
]
}
}
}.
-reshard_basics(Top) ->
+basics(Top) ->
?_test(begin
% GET /_reshard
?assertMatch({200, #{
@@ -107,17 +114,17 @@ reshard_basics(Top) ->
end).
-reshard_create_job_basic(Top) ->
+create_job_basic(Top) ->
?_test(begin
% POST /_reshard/jobs
{C1, R1} = req(post, Top ++ ?JOBS, #{type => split, db => <<?DB1>>}),
?assertEqual(201, C1),
?assertMatch([#{<<"ok">> := true, <<"id">> := J, <<"shard">> := S}]
when is_binary(J) andalso is_binary(S), R1),
- [#{<<"id">> := Id, <<"shard">> := Shard}] = R1,
+ [#{?ID := Id, <<"shard">> := Shard}] = R1,
% GET /_reshard/jobs
?assertMatch({200, #{
- <<"jobs">> := [#{<<"id">> := Id, <<"type">> := <<"split">>}],
+ <<"jobs">> := [#{?ID := Id, <<"type">> := <<"split">>}],
<<"offset">> := 0,
<<"total_rows">> := 1
}}, req(get, Top ++ ?JOBS)),
@@ -125,7 +132,7 @@ reshard_create_job_basic(Top) ->
{C2, R2} = req(get, Top ++ ?JOBS ++ ?b2l(Id)),
?assertEqual(200, C2),
ThisNode = atom_to_binary(node(), utf8),
- ?assertMatch(#{<<"id">> := Id}, R2),
+ ?assertMatch(#{?ID := Id}, R2),
?assertMatch(#{<<"type">> := <<"split">>}, R2),
?assertMatch(#{<<"source">> := Shard}, R2),
?assertMatch(#{<<"history">> := History} when length(History) > 1, R2),
@@ -154,19 +161,19 @@ reshard_create_job_basic(Top) ->
end).
-reshard_create_two_jobs(Top) ->
+create_two_jobs(Top) ->
?_test(begin
?assertMatch({201, [#{<<"ok">> := true}]},
req(post, Top ++ ?JOBS, #{type => split, db => <<?DB1>>})),
?assertMatch({201, [#{<<"ok">> := true}]},
req(post, Top ++ ?JOBS, #{type => split, db => <<?DB2>>})),
?assertMatch({200, #{
- <<"jobs">> := [#{<<"id">> := Id1}, #{<<"id">> := Id2}],
+ <<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}],
<<"offset">> := 0,
<<"total_rows">> := 2
}} when Id1 =/= Id2, req(get, Top ++ ?JOBS)),
?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)),
- {200, #{<<"jobs">> := [#{<<"id">> := Id1}, #{<<"id">> := Id2}]}} =
+ {200, #{<<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}]}} =
req(get, Top ++ ?JOBS),
{200, #{<<"ok">> := true}} = req(delete, Top ++ ?JOBS ++ ?b2l(Id1)),
?assertMatch({200, #{<<"total">> := 1}}, req(get, Top ++ ?RESHARD)),
@@ -175,7 +182,7 @@ reshard_create_two_jobs(Top) ->
end).
-reshard_start_stop_cluster_basic(Top) ->
+start_stop_cluster_basic(Top) ->
?_test(begin
Url = Top ++ ?STATE,
?assertMatch({200, #{
@@ -204,7 +211,7 @@ reshard_start_stop_cluster_basic(Top) ->
end).
-reshard_start_stop_cluster_with_a_job(Top) ->
+start_stop_cluster_with_a_job(Top) ->
?_test(begin
Url = Top ++ ?STATE,
?assertMatch({200, _}, req(put, Url, #{state => stopped})),
@@ -212,9 +219,9 @@ reshard_start_stop_cluster_with_a_job(Top) ->
% Can add jobs with global state stopped, they just won't be running
{201, R1} = req(post, Top ++ ?JOBS, #{type => split, db => <<?DB1>>}),
?assertMatch([#{<<"ok">> := true}], R1),
- [#{<<"id">> := Id1}] = R1,
+ [#{?ID := Id1}] = R1,
{200, J1} = req(get, Top ++ ?JOBS ++ ?b2l(Id1)),
- ?assertMatch(#{<<"id">> := Id1, <<"job_state">> := <<"stopped">>}, J1),
+ ?assertMatch(#{?ID := Id1, <<"job_state">> := <<"stopped">>}, J1),
% Check summary stats
?assertMatch({200, #{
<<"state">> := <<"stopped">>,
@@ -231,18 +238,111 @@ reshard_start_stop_cluster_with_a_job(Top) ->
<<"total">> := 0
}}, req(get, Top ++ ?RESHARD)),
% Add same job again
- {201, [#{<<"id">> := Id2}]} = req(post, Top ++ ?JOBS, #{type => split,
+ {201, [#{?ID := Id2}]} = req(post, Top ++ ?JOBS, #{type => split,
db => <<?DB1>>}),
- ?assertMatch({200, #{<<"id">> := Id2, <<"job_state">> := <<"stopped">>}},
+ ?assertMatch({200, #{?ID := Id2, <<"job_state">> := <<"stopped">>}},
req(get, Top ++ ?JOBS ++ ?b2l(Id2))),
% Job should start after resharding is started on the cluster
?assertMatch({200, _}, req(put, Url, #{state => running})),
- ?assertMatch({200, #{<<"id">> := Id2, <<"job_state">> := JSt}}
+ ?assertMatch({200, #{?ID := Id2, <<"job_state">> := JSt}}
when JSt =/= <<"stopped">>, req(get, Top ++ ?JOBS ++ ?b2l(Id2))),
{200, #{<<"ok">> := true}} = req(delete, Top ++ ?JOBS ++ ?b2l(Id2))
end).
+individual_job_start_stop(Top) ->
+ ?_test(begin
+ intercept_state(topoff1),
+ Body = #{type => split, db => <<?DB1>>},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+ JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+ StUrl = JobUrl ++ "/state",
+ % Wait for the the job to start running and intercept it in topoff1 state
+ receive {JobPid, topoff1} -> ok end,
+ % Tell the intercept to never finish checkpointing so job is left hanging
+ % forever in running state
+ JobPid ! cancel,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+ {200, _} = req(put, StUrl, #{state => stopped}),
+ wait_state(StUrl, <<"stopped">>),
+ % Stop/start resharding globally and job should still stay stopped
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+ % Start the job again
+ ?assertMatch({200, _}, req(put, StUrl, #{state => running})),
+ % Wait for the the job to start running and intercept it in topoff1 state
+ receive {JobPid2, topoff1} -> ok end,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+ % Let it continue running and it should complete eventually
+ JobPid2 ! continue,
+ wait_state(StUrl, <<"completed">>),
+ ?assertMatch({200, #{<<"ok">> := true}}, req(delete, JobUrl))
+ end).
+
+
+individual_job_stop_when_cluster_stopped(Top) ->
+ ?_test(begin
+ intercept_state(topoff1),
+ Body = #{type => split, db => <<?DB1>>},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+ JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+ StUrl = JobUrl ++ "/state",
+ % Wait for the the job to start running and intercept it in topoff1 state
+ receive {JobPid, topoff1} -> ok end,
+ % Tell the intercept to never finish checkpointing so job is left hanging
+ % forever in running state
+ JobPid ! cancel,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+ % Stop resharding globally
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+ wait_state(StUrl, <<"stopped">>),
+ % Stop the job specifically
+ {200, _} = req(put, StUrl, #{state => stopped}),
+ % Job stays stopped
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+ % Set cluster to running again
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+ % The job should stay stopped
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+ % The it should be possible to resume the job and it should complete
+ ?assertMatch({200, _}, req(put, StUrl, #{state => running})),
+ % Wait for the the job to start running and intercept it in topoff1 state
+ receive {JobPid2, topoff1} -> ok end,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+ % Let it continue running and it should complete eventually
+ JobPid2 ! continue,
+ wait_state(StUrl, <<"completed">>),
+ ?assertMatch({200, #{<<"ok">> := true}}, req(delete, JobUrl))
+ end).
+
+
+intercept_state(State) ->
+ TestPid = self(),
+ meck:new(mem3_reshard_job, [passthrough]),
+ meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+ case Job#job.split_state of
+ State ->
+ TestPid ! {self(), State},
+ receive
+ continue -> meck:passthrough([Job]);
+ cancel -> ok
+ end;
+ _ ->
+ meck:passthrough([Job])
+ end
+ end).
+
+
+wait_state(Url, State) ->
+ test_util:wait(fun() ->
+ case req(get, Url) of
+ {200, #{<<"state">> := State}} -> ok;
+ {200, #{}} -> timer:sleep(250), wait
+ end
+ end).
+
+
create_db(Url) ->
{ok, Status, _, _} = test_request:put(Url, [?JSON, ?AUTH], "{}"),
?assert(Status =:= 201 orelse Status =:= 202).