You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/02/15 21:08:23 UTC

[couchdb] branch shard-split updated: When db is deleted, delete all completed jobs as well

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch shard-split
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/shard-split by this push:
     new 2dcc021  When db is deleted, delete all completed jobs as well
2dcc021 is described below

commit 2dcc0216983be8f91f95c1782f307b4b8c0f6523
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Feb 15 16:08:14 2019 -0500

    When db is deleted, delete all completed jobs as well
---
 src/mem3/src/mem3_reshard.erl           | 80 +++++++++++++++++++++++++++------
 src/mem3/src/mem3_reshard.hrl           |  3 +-
 src/mem3/src/mem3_reshard_dbdoc.erl     |  2 +-
 src/mem3/test/mem3_reshard_api_test.erl | 23 +++++++++-
 4 files changed, 91 insertions(+), 17 deletions(-)

diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl
index 2b968d1..7cd510c 100644
--- a/src/mem3/src/mem3_reshard.erl
+++ b/src/mem3/src/mem3_reshard.erl
@@ -170,11 +170,13 @@ init(_) ->
     couch_log:notice("~p start init()", [?MODULE]),
     EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true}],
     ?MODULE = ets:new(?MODULE, EtsOpts),
+    ManagerPid = self(),
     State = #state{
         state = running,
         state_info = [],
         update_time = now_sec(),
-        node = node()
+        node = node(),
+        db_monitor = spawn_link(fun() -> db_monitor(ManagerPid) end)
     },
     State1 = mem3_reshard_store:init(State, ?JOB_PREFIX, state_id()),
     State2 = mem3_reshard_store:load_state(State1),
@@ -184,6 +186,8 @@ init(_) ->
 
 terminate(Reason, State) ->
     couch_log:notice("~p terminate ~p ~p", [?MODULE, Reason, statefmt(State)]),
+    catch unlink(State#state.db_monitor),
+    catch exit(State#state.db_monitor, kill),
     [kill_job_int(Job) || Job <- running_jobs()],
     ok.
 
@@ -279,19 +283,8 @@ handle_call({stop_job, Id, Reason}, _From, State) ->
             {reply, {error, not_found}, State}
     end;
 
-
 handle_call({remove_job, Id}, _From, State) ->
-    couch_log:notice("~p call remove_job Id:~p", [?MODULE, Id]),
-    case job_by_id(Id) of
-        #job{} = Job ->
-            ok = stop_job_int(Job, stopped, "Removed by user", State),
-            ok = mem3_reshard_store:delete_job(State, Id),
-            ets:delete(?MODULE, Job#job.id),
-            {reply, ok, State};
-        not_found ->
-            {reply, {error, not_found}, State}
-    end;
-
+    {reply, remove_job_int(Id, State), State};
 
 handle_call(get_state, _From, #state{state = GlobalState} = State) ->
     StateProps = mem3_reshard_store:state_to_ejson_props(State),
@@ -319,6 +312,11 @@ handle_call(Call, From, State) ->
     couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]),
     {noreply, State}.
 
+handle_cast({db_deleted, JobIds}, State) ->
+    LogMsg = "~p removing ~p jobs after db was deleted",
+    couch_log:notice(LogMsg, [?MODULE, length(JobIds)]),
+    [remove_job_int(JobId, State) || JobId <- JobIds],
+    {noreply, State};
 
 handle_cast({report, Job}, State) ->
     report_int(Job),
@@ -686,6 +684,20 @@ report_int(Job) ->
     end.
 
 
+-spec remove_job_int(#job{}, #state{}) -> ok | {error, not_found}.
+remove_job_int(Id, State) ->
+    couch_log:notice("~p call remove_job Id:~p", [?MODULE, Id]),
+    case job_by_id(Id) of
+        #job{} = Job ->
+            kill_job_int(Job),
+            ok = mem3_reshard_store:delete_job(State, Id),
+            ets:delete(?MODULE, Job#job.id),
+            ok;
+        not_found ->
+            {error, not_found}
+    end.
+
+
 % This function is for testing and debugging only
 -spec reset_state(#state{}) -> #state{}.
 reset_state(#state{} = State) ->
@@ -719,7 +731,6 @@ update_job_history(#job{job_state = St, update_time = Ts} = Job) ->
     Job#job{history = update_history(St, Reason, Ts, Hist)}.
 
 
-
 -spec update_history_rev(atom(), binary() | null, time_sec(), list()) -> list().
 update_history(State, State, Ts, History) ->
     % State is the same as detail. Make the detail null to avoid duplication
@@ -754,6 +765,47 @@ max_history() ->
     config:get_integer("reshard", "max_history", ?DEFAULT_MAX_HISTORY).
 
 
+
+-spec completed_jobs_by_dbname(binary()) -> [#job{}].
+completed_jobs_by_dbname(Name) ->
+    DbName = mem3:dbname(Name),
+    Pat = #job{
+        job_state = completed,
+        source = #shard{dbname = DbName, _ = '_'},
+        _ = '_'
+    },
+    [JobId || #job{id = JobId} <- ets:match_object(?MODULE, Pat)].
+
+
+-spec db_monitor(pid()) -> no_return().
+db_monitor(Server) ->
+    couch_log:notice("~p db monitor ~p starting", [?MODULE, self()]),
+    EvtRef = erlang:monitor(process, couch_event_server),
+    couch_event:register_all(self()),
+    db_monitor_loop(Server, EvtRef).
+
+
+-spec db_monitor_loop(pid(), reference()) -> no_return().
+db_monitor_loop(Server, EvtRef) ->
+    receive
+        {'$couch_event', DbName, deleted} ->
+            case completed_jobs_by_dbname(DbName) of
+                [] ->
+                    ok;
+                JobIds ->
+                    gen_server:cast(Server, {db_deleted, JobIds})
+            end,
+            db_monitor_loop(Server, EvtRef);
+        {'$couch_event', _, _} ->
+            db_monitor_loop(Server, EvtRef);
+        {'DOWN', EvtRef, _, _, Info} ->
+            couch_log:error("~p db monitor listener died ~p", [?MODULE, Info]),
+            exit({db_monitor_died, Info});
+        Msg ->
+            couch_log:error("~p db monitor unexpected msg ~p", [?MODULE, Msg]),
+            db_monitor_loop(Server, EvtRef)
+    end.
+
 -spec statefmt(#state{} | term()) -> string().
 statefmt(#state{state = StateName}) ->
     Total = ets:info(?MODULE, size),
diff --git a/src/mem3/src/mem3_reshard.hrl b/src/mem3/src/mem3_reshard.hrl
index 175377f..770dc74 100644
--- a/src/mem3/src/mem3_reshard.hrl
+++ b/src/mem3/src/mem3_reshard.hrl
@@ -69,7 +69,8 @@
     update_time :: non_neg_integer(),
     job_prefix :: binary(),
     state_id :: binary(),
-    node :: node()
+    node :: node(),
+    db_monitor :: pid()
 }).
 
 
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl b/src/mem3/src/mem3_reshard_dbdoc.erl
index 8dd6f1b..135f57f 100644
--- a/src/mem3/src/mem3_reshard_dbdoc.erl
+++ b/src/mem3/src/mem3_reshard_dbdoc.erl
@@ -48,7 +48,7 @@ update_shard_map(#job{source = Source, target = Target} = Job) ->
         end
     catch
         _:Err ->
-            exit({shard_update_error, Err})
+            exit(Err)
     end,
     LogMsg2 = "~p : update_shard_map on node returned. ~p",
     couch_log:notice(LogMsg2, [?MODULE, Node]),
diff --git a/src/mem3/test/mem3_reshard_api_test.erl b/src/mem3/test/mem3_reshard_api_test.erl
index 270c3e2..70640f3 100644
--- a/src/mem3/test/mem3_reshard_api_test.erl
+++ b/src/mem3/test/mem3_reshard_api_test.erl
@@ -101,7 +101,8 @@ mem3_reshard_api_test_() ->
                     fun recover_in_wait_source_close/1,
                     fun recover_in_topoff3/1,
                     fun recover_in_source_delete/1,
-                    fun check_max_jobs/1
+                    fun check_max_jobs/1,
+                    fun cleanup_completed_jobs/1
                 ]
             }
         }
@@ -680,6 +681,17 @@ check_max_jobs(Top) ->
     end).
 
 
+cleanup_completed_jobs(Top) ->
+    ?_test(begin
+        Body = #{type => split, db => <<?DB1>>},
+        {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+        JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+        wait_state(JobUrl ++ "/state", <<"completed">>),
+        delete_db(Top, ?DB1),
+        wait_for_http_code(JobUrl, 404)
+    end).
+
+
 % Test help functions
 
 wait_to_complete_then_cleanup(Top, Jobs) ->
@@ -729,6 +741,15 @@ wait_state(Url, State) ->
     end, 30000).
 
 
+wait_for_http_code(Url, Code) when is_integer(Code) ->
+    test_util:wait(fun() ->
+            case req(get, Url) of
+                {Code, _} -> ok;
+                {_, _} -> timer:sleep(100), wait
+            end
+    end, 30000).
+
+
 delete_source_in_state(Top, Db, State) when is_atom(State) ->
     intercept_state(State),
     Body = #{type => split, db => list_to_binary(Db)},