You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/02/07 15:34:34 UTC

[couchdb] branch shard-split updated: Expand API tests to tests job lifecycle

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch shard-split
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/shard-split by this push:
     new 709b615  Expand API tests to tests job lifecycle
709b615 is described below

commit 709b615aafd089138310bba6892a04ee88edb9af
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Thu Feb 7 10:31:45 2019 -0500

    Expand API tests to tests job lifecycle
    
    Also fix associated bugs
---
 src/mem3/src/mem3_reshard.erl           |  49 ++++++-----
 src/mem3/src/mem3_reshard_debug.erl     |  31 +++++--
 src/mem3/src/mem3_reshard_job.erl       |   4 +-
 src/mem3/test/mem3_reshard_api_test.erl | 140 +++++++++++++++++++++++++++-----
 4 files changed, 175 insertions(+), 49 deletions(-)

diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl
index 7e390ec..4ba0e6d 100644
--- a/src/mem3/src/mem3_reshard.erl
+++ b/src/mem3/src/mem3_reshard.erl
@@ -212,7 +212,7 @@ handle_call({stop, Reason}, _From, #state{state = running} = State) ->
         state_info = info_update(reason, Reason, State#state.state_info)
     },
     ok = mem3_reshard_store:store_state(State1),
-    [kill_job_int(Job) || Job <- running_jobs()],
+    [temporarily_stop_job(Job) || Job <- running_jobs()],
     {reply, ok, State1};
 
 handle_call({stop, _}, _From, State) ->
@@ -273,7 +273,8 @@ handle_call({resume_job, Id}, _From, State) ->
 handle_call({stop_job, Id, Reason}, _From, State) ->
     couch_log:notice("~p stop_job Id:~p Reason:~p", [?MODULE, Id, Reason]),
     case job_by_id(Id) of
-        #job{job_state = running} = Job ->
+        #job{job_state = JSt} = Job when JSt =:= running orelse JSt =:= new
+                orelse JSt =:= stopped ->
             ok = stop_job_int(Job, stopped, Reason, State),
             {reply, ok, State};
         #job{} ->
@@ -371,11 +372,12 @@ code_change(_OldVsn, State, _Extra) ->
 
 
 % Insert job in the ets table as a temporarily stopped job. This would happen
-% then job is reload or added when cluster-wide resharding is stopped.
+% then job is reloaded or added when cluster-wide resharding is stopped.
 -spec temporarily_stop_job(#job{}) -> #job{}.
 temporarily_stop_job(Job) ->
-    OldInfo = Job#job.state_info,
-    Job1 = Job#job{
+    Job1 = kill_job_int(Job),
+    OldInfo = Job1#job.state_info,
+    Job2 = Job1#job{
         job_state = stopped,
         update_time = now_sec(),
         start_time = 0,
@@ -383,8 +385,9 @@ temporarily_stop_job(Job) ->
         pid = undefined,
         ref = undefined
     },
-    true = ets:insert(?MODULE, update_job_history(Job1)),
-    Job1.
+    Job3 = update_job_history(Job2),
+    true = ets:insert(?MODULE, Job3),
+    Job3.
 
 
 -spec reload_jobs(#state{}) -> #state{}.
@@ -414,10 +417,10 @@ reload_job(#job{job_state = JS} = Job, #state{state = running} = State)
             State
     end;
 
-% The default case is to just load the job into the ets table. This would be
-% failed or completed jobs for example
+% If job is disabled individually (stopped by the user), is completed or failed
+% then simply load it into the ets table
 reload_job(#job{job_state = JS} = Job, #state{} = State)
-        when JS =:= failed orelse JS =:= completed ->
+        when JS =:= failed orelse JS =:= completed orelse JS =:= stopped ->
     true = ets:insert(?MODULE, Job),
     State.
 
@@ -460,9 +463,6 @@ spawn_job(#job{} = Job0) ->
 
 
 -spec stop_job_int(#job{}, job_state(), term(), #state{}) -> ok.
-stop_job_int(#job{pid = undefined}, _JobState, _Reason, _State) ->
-    ok;
-
 stop_job_int(#job{} = Job, JobState, Reason, State) ->
     couch_log:info("~p stop_job_int ~p newstate: ~p reason:~p", [?MODULE,
         jobfmt(Job), JobState, Reason]),
@@ -473,6 +473,7 @@ stop_job_int(#job{} = Job, JobState, Reason, State) ->
         state_info = [{reason, Reason}]
     },
     ok = mem3_reshard_store:store_job(State, Job2),
+    true = ets:insert(?MODULE, Job2),
     couch_log:info("~p stop_job_int stopped ~p", [?MODULE, jobfmt(Job2)]),
     ok.
 
@@ -652,7 +653,7 @@ jobs_by_dbname(Name) ->
 -spec running_jobs() -> [#job{}].
 running_jobs() ->
     Pat = #job{job_state = running, _ = '_'},
-    [Job || [Job] <- ets:match_object(?MODULE, Pat)].
+    ets:match_object(?MODULE, Pat).
 
 
 -spec fail_jobs_for_deleted_sources([#job{}], #state{}) -> ok.
@@ -690,13 +691,19 @@ checkpoint_int(#job{} = Job, State) ->
 -spec report_int(#job{}) -> ok | not_found.
 report_int(Job) ->
     case ets:lookup(?MODULE, Job#job.id) of
-        [#job{ref = Ref}] ->
-            couch_log:notice("~p reported ~s", [?MODULE, jobfmt(Job)]),
-            % We care over the reference used to monitor this job. The job
-            % record coming in from the job itself won't have and if we just
-            % ets:insert it we'd end up forgetting the old ref
-            true = ets:insert(?MODULE, Job#job{ref = Ref}),
-            ok;
+        [#job{ref = Ref, pid = OldPid}] ->
+            case Job#job.pid =:= OldPid of
+                true ->
+                    couch_log:notice("~p reported ~s", [?MODULE, jobfmt(Job)]),
+                    % Carry over the reference from ets as the #job{} coming
+                    % from the job process won't have it's own monitor ref.
+                    true = ets:insert(?MODULE, Job#job{ref = Ref}),
+                    ok;
+                false ->
+                    LogMsg = "~p ignoring old job report ~p new job:~p",
+                    couch_log:warning(LogMsg, [?MODULE, jobfmt(Job), OldPid]),
+                    not_found
+            end;
         _ ->
             couch_log:error("~p reporting : couldn't find ~p", [?MODULE, Job]),
             not_found
diff --git a/src/mem3/src/mem3_reshard_debug.erl b/src/mem3/src/mem3_reshard_debug.erl
index b276391..15bb6d2 100644
--- a/src/mem3/src/mem3_reshard_debug.erl
+++ b/src/mem3/src/mem3_reshard_debug.erl
@@ -19,6 +19,9 @@
 
 
 -export([
+    state_intercept_init/0,
+    state_intercept_cleanup/0,
+    state_intercept/2,
     start_test_job/0,
     start_test_job/1,
     create_test_db/1,
@@ -102,18 +105,34 @@ all_docs(DbName) ->
     fabric:all_docs(DbName, Cb, ok, #mrargs{}).
 
 
+
+state_intercept_tid() ->
+     list_to_atom("mem3_reshard_" ++ erlang:pid_to_list(self())).
+
+
+state_intercept_cleanup() ->
+    meck:unload(),
+    catch ets:delete(state_intercept_tid()),
+    ok.
+
+
 state_intercept(State, Fun) ->
-    ets:insert(?MODULE, {State, Fun}).
+    Tid = state_intercept_tid(),
+    case ets:info(Tid) of
+        undefined -> state_intercept_init();
+        [_ | _] -> ok
+    end,
+    ets:insert(Tid, {State, Fun}).
+
 
 state_intercept_init() ->
-    meck:unload(),
-    catch ets:delete(?MODULE),
-    ets:new(?MODULE, [named_table, public, set]),
-    meck:new(mem3_reshard, [passthrough]),
+    Tid = state_intercept_tid(),
+    ets:new(Tid, [named_table, public, set]),
+    catch meck:new(mem3_reshard, [passthrough]),
     GL = erlang:group_leader(),
     meck:expect(mem3_reshard, checkpoint,
         fun(Server, #job{} = Job) ->
-            case ets:lookup(?MODULE, Job#job.split_state) of
+            case ets:lookup(Tid, Job#job.split_state) of
                 [{_, Fun}] ->
                     {_, Ref} = spawn_monitor(fun() ->
                         erlang:group_leader(GL, self()),
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
index 41a33c4..f22720f 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -66,8 +66,8 @@ jobfmt(#job{} = Job) ->
         pid = Pid
     } = Job,
     TargetCount = length(Targets),
-    Msg = "#job{~s ~s /~B job:~s state:~s pid:~p}",
-    Fmt = io_lib:format(Msg, [Id, Source, TargetCount, State, JobState, Pid]),
+    Msg = "#job{~s ~s /~B job_state:~s split_state:~s pid:~p}",
+    Fmt = io_lib:format(Msg, [Id, Source, TargetCount, JobState, State, Pid]),
     lists:flatten(Fmt).
 
 
diff --git a/src/mem3/test/mem3_reshard_api_test.erl b/src/mem3/test/mem3_reshard_api_test.erl
index a1055b4..2d5cf8f 100644
--- a/src/mem3/test/mem3_reshard_api_test.erl
+++ b/src/mem3/test/mem3_reshard_api_test.erl
@@ -15,6 +15,7 @@
 
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/src/mem3_reshard.hrl").
 
 
 -define(USER, "mem3_reshard_api_test_admin").
@@ -27,6 +28,7 @@
 -define(RESHARD, "_reshard/").
 -define(JOBS, "_reshard/jobs/").
 -define(STATE, "_reshard/state").
+-define(ID, <<"id">>).
 
 
 setup() ->
@@ -39,11 +41,14 @@ setup() ->
     create_db(Url ++ ?DB1 ++ "?q=1&n=1"),
     create_db(Url ++ ?DB2 ++ "?q=1&n=1"),
     create_db(Url ++ ?DB3 ++ "?q=2&n=1"),
+    %mem3_reshard_debug:state_intercept_init(),
     Url.
 
 
 teardown(Url) ->
+    meck:unload(),
     mem3_reshard:reset_state(),
+    %mem3_reshard_debug:state_intercept_cleanup(),
     delete_db(Url ++ ?DB1),
     delete_db(Url ++ ?DB2),
     delete_db(Url ++ ?DB3),
@@ -68,18 +73,20 @@ mem3_reshard_api_test_() ->
                 foreach,
                 fun setup/0, fun teardown/1,
                 [
-                    fun reshard_basics/1,
-                    fun reshard_create_job_basic/1,
-                    fun reshard_create_two_jobs/1,
-                    fun reshard_start_stop_cluster_basic/1,
-                    fun reshard_start_stop_cluster_with_a_job/1
+                    fun basics/1,
+                    fun create_job_basic/1,
+                    fun create_two_jobs/1,
+                    fun start_stop_cluster_basic/1,
+                    fun start_stop_cluster_with_a_job/1,
+                    fun individual_job_start_stop/1,
+                    fun individual_job_stop_when_cluster_stopped/1
                 ]
             }
         }
     }.
 
 
-reshard_basics(Top) ->
+basics(Top) ->
     ?_test(begin
         % GET /_reshard
         ?assertMatch({200, #{
@@ -107,17 +114,17 @@ reshard_basics(Top) ->
     end).
 
 
-reshard_create_job_basic(Top) ->
+create_job_basic(Top) ->
     ?_test(begin
         % POST /_reshard/jobs
         {C1, R1} = req(post, Top ++ ?JOBS,  #{type => split, db => <<?DB1>>}),
         ?assertEqual(201, C1),
         ?assertMatch([#{<<"ok">> := true, <<"id">> := J, <<"shard">> :=  S}]
             when is_binary(J) andalso is_binary(S), R1),
-        [#{<<"id">> := Id, <<"shard">> := Shard}] = R1,
+        [#{?ID := Id, <<"shard">> := Shard}] = R1,
         % GET /_reshard/jobs
         ?assertMatch({200, #{
-            <<"jobs">> := [#{<<"id">> := Id, <<"type">> := <<"split">>}],
+            <<"jobs">> := [#{?ID := Id, <<"type">> := <<"split">>}],
             <<"offset">> := 0,
             <<"total_rows">> := 1
         }}, req(get, Top ++ ?JOBS)),
@@ -125,7 +132,7 @@ reshard_create_job_basic(Top) ->
         {C2, R2} = req(get, Top ++ ?JOBS ++ ?b2l(Id)),
         ?assertEqual(200, C2),
         ThisNode = atom_to_binary(node(), utf8),
-        ?assertMatch(#{<<"id">> := Id}, R2),
+        ?assertMatch(#{?ID := Id}, R2),
         ?assertMatch(#{<<"type">> := <<"split">>}, R2),
         ?assertMatch(#{<<"source">> := Shard}, R2),
         ?assertMatch(#{<<"history">> := History} when length(History) > 1, R2),
@@ -154,19 +161,19 @@ reshard_create_job_basic(Top) ->
     end).
 
 
-reshard_create_two_jobs(Top) ->
+create_two_jobs(Top) ->
     ?_test(begin
         ?assertMatch({201, [#{<<"ok">> := true}]},
             req(post, Top ++ ?JOBS, #{type => split, db => <<?DB1>>})),
         ?assertMatch({201, [#{<<"ok">> := true}]},
             req(post, Top ++ ?JOBS, #{type => split, db => <<?DB2>>})),
         ?assertMatch({200, #{
-            <<"jobs">> := [#{<<"id">> := Id1}, #{<<"id">> := Id2}],
+            <<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}],
             <<"offset">> := 0,
             <<"total_rows">> := 2
         }} when Id1 =/= Id2, req(get, Top ++ ?JOBS)),
         ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)),
-        {200, #{<<"jobs">> := [#{<<"id">> := Id1}, #{<<"id">> := Id2}]}} =
+        {200, #{<<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}]}} =
             req(get, Top ++ ?JOBS),
         {200, #{<<"ok">> := true}} = req(delete, Top ++ ?JOBS ++ ?b2l(Id1)),
         ?assertMatch({200, #{<<"total">> := 1}}, req(get, Top ++ ?RESHARD)),
@@ -175,7 +182,7 @@ reshard_create_two_jobs(Top) ->
     end).
 
 
-reshard_start_stop_cluster_basic(Top) ->
+start_stop_cluster_basic(Top) ->
     ?_test(begin
         Url = Top ++ ?STATE,
         ?assertMatch({200, #{
@@ -204,7 +211,7 @@ reshard_start_stop_cluster_basic(Top) ->
     end).
 
 
-reshard_start_stop_cluster_with_a_job(Top) ->
+start_stop_cluster_with_a_job(Top) ->
     ?_test(begin
         Url = Top ++ ?STATE,
         ?assertMatch({200, _}, req(put, Url, #{state => stopped})),
@@ -212,9 +219,9 @@ reshard_start_stop_cluster_with_a_job(Top) ->
         % Can add jobs with global state stopped, they just won't be running
         {201, R1} = req(post, Top ++ ?JOBS, #{type => split, db => <<?DB1>>}),
         ?assertMatch([#{<<"ok">> := true}], R1),
-        [#{<<"id">> := Id1}] = R1,
+        [#{?ID := Id1}] = R1,
         {200, J1} = req(get, Top ++ ?JOBS ++ ?b2l(Id1)),
-        ?assertMatch(#{<<"id">> := Id1, <<"job_state">> := <<"stopped">>}, J1),
+        ?assertMatch(#{?ID := Id1, <<"job_state">> := <<"stopped">>}, J1),
         % Check summary stats
         ?assertMatch({200, #{
             <<"state">> := <<"stopped">>,
@@ -231,18 +238,111 @@ reshard_start_stop_cluster_with_a_job(Top) ->
             <<"total">> := 0
         }}, req(get, Top ++ ?RESHARD)),
         % Add same job again
-        {201, [#{<<"id">> := Id2}]} = req(post, Top ++ ?JOBS, #{type => split,
+        {201, [#{?ID := Id2}]} = req(post, Top ++ ?JOBS, #{type => split,
             db => <<?DB1>>}),
-        ?assertMatch({200, #{<<"id">> := Id2, <<"job_state">> := <<"stopped">>}},
+        ?assertMatch({200, #{?ID := Id2, <<"job_state">> := <<"stopped">>}},
             req(get, Top ++ ?JOBS ++ ?b2l(Id2))),
         % Job should start after resharding is started on the cluster
         ?assertMatch({200, _}, req(put, Url, #{state => running})),
-        ?assertMatch({200, #{<<"id">> := Id2, <<"job_state">> := JSt}}
+        ?assertMatch({200, #{?ID := Id2, <<"job_state">> := JSt}}
             when JSt =/= <<"stopped">>, req(get, Top ++ ?JOBS ++ ?b2l(Id2))),
         {200, #{<<"ok">> := true}} = req(delete, Top ++ ?JOBS ++ ?b2l(Id2))
      end).
 
 
+individual_job_start_stop(Top) ->
+    ?_test(begin
+        intercept_state(topoff1),
+        Body = #{type => split, db => <<?DB1>>},
+        {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+        JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+        StUrl =  JobUrl ++ "/state",
+        % Wait for the the job to start running and intercept it in topoff1 state
+        receive {JobPid, topoff1} -> ok end,
+        % Tell the intercept to never finish checkpointing so job is left hanging
+        % forever in running state
+        JobPid ! cancel,
+        ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+        {200, _} = req(put, StUrl, #{state => stopped}),
+        wait_state(StUrl, <<"stopped">>),
+        % Stop/start resharding globally and job should still stay stopped
+        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+        ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+        % Start the job again
+        ?assertMatch({200, _}, req(put, StUrl, #{state => running})),
+        % Wait for the the job to start running and intercept it in topoff1 state
+        receive {JobPid2, topoff1} -> ok end,
+        ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+        % Let it continue running and it should complete eventually
+        JobPid2 ! continue,
+        wait_state(StUrl, <<"completed">>),
+        ?assertMatch({200, #{<<"ok">> := true}}, req(delete, JobUrl))
+    end).
+
+
+individual_job_stop_when_cluster_stopped(Top) ->
+    ?_test(begin
+        intercept_state(topoff1),
+        Body = #{type => split, db => <<?DB1>>},
+        {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+        JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+        StUrl =  JobUrl ++ "/state",
+        % Wait for the the job to start running and intercept it in topoff1 state
+        receive {JobPid, topoff1} -> ok end,
+        % Tell the intercept to never finish checkpointing so job is left hanging
+        % forever in running state
+        JobPid ! cancel,
+        ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+        % Stop resharding globally
+        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+        wait_state(StUrl, <<"stopped">>),
+        % Stop the job specifically
+        {200, _} = req(put, StUrl, #{state => stopped}),
+        % Job stays stopped
+        ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+        % Set cluster to running again
+        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+        % The job should stay stopped
+        ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+        % The it should be possible to resume the job and it should complete
+        ?assertMatch({200, _}, req(put, StUrl, #{state => running})),
+        % Wait for the the job to start running and intercept it in topoff1 state
+        receive {JobPid2, topoff1} -> ok end,
+        ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+        % Let it continue running and it should complete eventually
+        JobPid2 ! continue,
+        wait_state(StUrl, <<"completed">>),
+        ?assertMatch({200, #{<<"ok">> := true}}, req(delete, JobUrl))
+    end).
+
+
+intercept_state(State) ->
+    TestPid = self(),
+    meck:new(mem3_reshard_job, [passthrough]),
+    meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+            case Job#job.split_state of
+                State ->
+                    TestPid ! {self(), State},
+                    receive
+                        continue -> meck:passthrough([Job]);
+                        cancel -> ok
+                    end;
+                _ ->
+                   meck:passthrough([Job])
+            end
+        end).
+
+
+wait_state(Url, State) ->
+    test_util:wait(fun() ->
+            case req(get, Url) of
+                {200, #{<<"state">> := State}} -> ok;
+                {200, #{}} -> timer:sleep(250), wait
+            end
+    end).
+
+
 create_db(Url) ->
     {ok, Status, _, _} = test_request:put(Url, [?JSON, ?AUTH], "{}"),
     ?assert(Status =:= 201 orelse Status =:= 202).