You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/02/05 16:24:01 UTC

[couchdb] branch shard-split updated: Make _reshard/jobs closer to _scheduler/jobs

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch shard-split
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/shard-split by this push:
     new 5772d4a  Make _reshard/jobs closer to _scheduler/jobs
5772d4a is described below

commit 5772d4aa6a49e10670ed5d8cefc0ad1349d18f0b
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Feb 5 11:22:44 2019 -0500

    Make _reshard/jobs closer to _scheduler/jobs
    
    Use type and timestamp instead of state and ts
    
    Make the history field called "history".
---
 src/mem3/src/mem3_reshard.erl       | 104 +++++++++++++++++++++---------------
 src/mem3/src/mem3_reshard.hrl       |   8 +--
 src/mem3/src/mem3_reshard_httpd.erl |   2 +-
 src/mem3/src/mem3_reshard_job.erl   |  23 +++++---
 src/mem3/src/mem3_reshard_store.erl |  58 ++++++++++----------
 5 files changed, 110 insertions(+), 85 deletions(-)

diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl
index 98b0fbe..9e83b94 100644
--- a/src/mem3/src/mem3_reshard.erl
+++ b/src/mem3/src/mem3_reshard.erl
@@ -30,7 +30,7 @@
    reset_state/0,
    db_monitor/1,
    now_sec/0,
-   update_history/3,
+   update_history/4,
 
    start_link/0,
 
@@ -76,13 +76,15 @@ start_split_job(#shard{} = Source, Split) ->
                         type = split,
                         job_state = new,
                         split_state = new,
-                        state_history = [{new, TStamp}],
+                        start_time = TStamp,
+                        update_time = TStamp,
                         node = node(),
                         source = Source,
                         targets = Targets
                    },
                    Job1 = Job#job{id = job_id(Job)},
-                   gen_server:call(?MODULE, {start_job, Job1}, infinity);
+                   Job2 = update_job_history(Job1),
+                   gen_server:call(?MODULE, {start_job, Job2}, infinity);
                 {error, Error} ->
                     {error, Error}
             end;
@@ -170,7 +172,7 @@ init(_) ->
     State = #state{
         state = running,
         state_info = [],
-        time_updated = now_sec(),
+        update_time = now_sec(),
         node = node(),
         db_monitor = spawn_link(?MODULE, db_monitor, [self()])
     },
@@ -191,7 +193,7 @@ terminate(Reason, State) ->
 handle_call(start, _From, #state{state = stopped} = State) ->
     State1 = State#state{
         state = running,
-        time_updated = now_sec(),
+        update_time = now_sec(),
         state_info = info_delete(reason, State#state.state_info)
     },
     ok = mem3_reshard_store:store_state(State1),
@@ -204,7 +206,7 @@ handle_call(start, _From, State) ->
 handle_call({stop, Reason}, _From, #state{state = running} = State) ->
     State1 = State#state{
         state = stopped,
-        time_updated = now_sec(),
+        update_time = now_sec(),
         state_info = info_update(reason, Reason, State#state.state_info)
     },
     ok = mem3_reshard_store:store_state(State1),
@@ -381,13 +383,13 @@ reload_job(#job{job_state = running} = Job, #state{state = stopped} = State) ->
     OldInfo = Job#job.state_info,
     Job1 = Job#job{
         job_state = stopped,
-        time_updated = now_sec(),
-        time_started = 0,
+        update_time = now_sec(),
+        start_time = 0,
         state_info = info_update(reason, <<"Shard splitting disabled">>, OldInfo),
         pid = undefined,
         ref = undefined
     },
-    true = ets:insert(?MODULE, update_job_state_history(Job1)),
+    true = ets:insert(?MODULE, update_job_history(Job1)),
     State;
 
 % This is a case when a job process should be spawend
@@ -422,7 +424,7 @@ get_start_delay_sec() ->
 start_job_int(Job, State) ->
     case spawn_job(Job) of
         {ok, #job{} = Job1} ->
-            Job2 = update_job_state_history(Job1),
+            Job2 = update_job_history(Job1),
             ok = mem3_reshard_store:store_job(State, Job2),
             true = ets:insert(?MODULE, Job2),
             ok;
@@ -434,8 +436,8 @@ start_job_int(Job, State) ->
 spawn_job(#job{} = Job0) ->
     Job = Job0#job{
         job_state = running,
-        time_started = 0,
-        time_updated = now_sec(),
+        start_time = 0,
+        update_time = now_sec(),
         state_info = info_delete(reason, Job0#job.state_info),
         manager = self(),
         workers = [],
@@ -459,7 +461,7 @@ stop_job_int(#job{} = Job, JobState, Reason, State) ->
     Job1 = kill_job_int(Job),
     Job2 = Job1#job{
         job_state = JobState,
-        time_updated = now_sec(),
+        update_time = now_sec(),
         state_info = [{reason, Reason}]
     },
     ok = mem3_reshard_store:store_job(State, Job2),
@@ -492,10 +494,10 @@ handle_job_exit(#job{split_state = completed} = Job, normal, State) ->
         pid = undefined,
         ref = undefined,
         job_state = completed,
-        time_updated = now_sec(),
+        update_time = now_sec(),
         state_info = []
     },
-    Job2 = update_job_state_history(Job1),
+    Job2 = update_job_history(Job1),
     ok = mem3_reshard_store:store_job(State, Job2),
     true = ets:insert(?MODULE, Job2),
     ok;
@@ -507,10 +509,10 @@ handle_job_exit(#job{job_state = running} = Job, normal, _State) ->
         pid = undefined,
         ref = undefined,
         job_state = stopped,
-        time_updated = now_sec(),
+        update_time = now_sec(),
         state_info = info_update(reason, <<"Job stopped">>, OldInfo)
     },
-    true = ets:insert(?MODULE, update_job_state_history(Job1)),
+    true = ets:insert(?MODULE, update_job_history(Job1)),
     ok;
 
 handle_job_exit(#job{job_state = running} = Job, shutdown, _State) ->
@@ -520,10 +522,10 @@ handle_job_exit(#job{job_state = running} = Job, shutdown, _State) ->
         pid = undefined,
         ref = undefined,
         job_state = stopped,
-        time_updated = now_sec(),
+        update_time = now_sec(),
         state_info = info_update(reason, <<"Job shutdown">>, OldInfo)
     },
-    true = ets:insert(?MODULE, update_job_state_history(Job1)),
+    true = ets:insert(?MODULE, update_job_history(Job1)),
     ok;
 
 handle_job_exit(#job{job_state = running} = Job, {shutdown, Msg},  _State) ->
@@ -533,10 +535,10 @@ handle_job_exit(#job{job_state = running} = Job, {shutdown, Msg},  _State) ->
         pid = undefined,
         ref = undefined,
         job_state = stopped,
-        time_updated = now_sec(),
+        update_time = now_sec(),
         state_info = info_update(reason, <<"Job shutdown">>, OldInfo)
     },
-    true = ets:insert(?MODULE, update_job_state_history(Job1)),
+    true = ets:insert(?MODULE, update_job_history(Job1)),
     ok;
 
 handle_job_exit(#job{} = Job, Error, State) ->
@@ -546,10 +548,10 @@ handle_job_exit(#job{} = Job, Error, State) ->
         pid = undefined,
         ref = undefined,
         job_state = failed,
-        time_updated = now_sec(),
+        update_time = now_sec(),
         state_info = info_update(reason, Error, OldInfo)
     },
-    Job2 = update_job_state_history(Job1),
+    Job2 = update_job_history(Job1),
     ok = mem3_reshard_store:store_job(State, Job2),
     true = ets:insert(?MODULE, Job2),
     ok.
@@ -730,27 +732,44 @@ reset_state(#state{} = State) ->
     end, State, Jobs).
 
 
--spec iso8601(non_neg_integer()) -> string().
-iso8601(UnixSec) ->
-    Base = calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}),
-    GSec = Base + UnixSec,
-    {{Y, Mon, D}, {H, Min, S}} = calendar:gregorian_seconds_to_datetime(GSec),
-    Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
-    lists:flatten(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
+-spec update_job_history(#job{}) -> #job{}.
+update_job_history(#job{job_state = St, update_time = Ts} = Job) ->
+    Hist = Job#job.history,
+    Reason = case couch_util:get_value(reason, Job#job.state_info) of
+        undefined -> null;
+        Val -> couch_util:to_binary(Val)
+    end,
+    Job#job{history = update_history(St, Reason, Ts, Hist)}.
+
+
+
+-spec update_history_rev(atom(), binary() | null, time_sec(), list()) -> list().
+update_history(State, State, Ts, History) ->
+    % State is the same as detail. Make the detail null to avoid duplication
+    update_history(State, null, Ts, History);
 
+update_history(State, Detail, Ts, History) ->
+    % Reverse, so we can process the last event as the head using
+    % head matches, then after append and trimming, reserse again
+    Rev = lists:reverse(History),
+    UpdatedRev = update_history_rev(State, Detail, Ts, Rev),
+    TrimmedRev = lists:sublist(UpdatedRev, max_history()),
+    lists:reverse(TrimmedRev).
 
--spec update_job_state_history(#job{}) -> #job{}.
-update_job_state_history(#job{job_state = St, time_updated = Ts} = Job) ->
-    Hist = Job#job.state_history,
-    Job#job{state_history = update_history(St, Ts, Hist)}.
+update_history_rev(State, null, Ts, [{State, Detail, _} | Rest]) ->
+    % Just updated the detail, state stays the same, no new entry added
+    [{State, Detail, Ts} | Rest];
 
+update_history_rev(State, Detail, Ts, [{State, Detail, _} | Rest]) ->
+    % State and detail were same as last event, just update the timestamp
+    [{State, Detail, Ts} | Rest];
 
--spec update_history(atom(), time_sec(), list()) -> list().
-update_history(State, Ts, [{State, _} | Rest]) ->
-    lists:sublist([{State, Ts} | Rest], max_history());
+update_history_rev(State, Detail, Ts, [{State, Detail, _} | Rest]) ->
+    % State and detail were same as last event, just update the timestamp
+    [{State, Detail, Ts} | Rest];
 
-update_history(State, Ts, History) ->
-    lists:sublist([{State, Ts} | History], max_history()).
+update_history_rev(State, Detail, Ts, History) ->
+    [{State, Detail, Ts} | History].
 
 
 -spec max_history() -> non_neg_integer().
@@ -762,14 +781,13 @@ max_history() ->
 statefmt(#state{} = State) ->
     #state{
         state = StateName,
-        time_updated = Updated,
+        update_time = Updated,
         db_monitor = Pid
     } = State,
     Total = ets:info(?MODULE, size),
     Active = mem3_reshard_job_sup:count_children(),
-    UpdatedFmt = iso8601(Updated),
-    Msg = "#state{~s updated:~s total:~B active:~B mon:~p}",
-    Fmt = io_lib:format(Msg, [StateName, UpdatedFmt, Total, Active, Pid]),
+    Msg = "#state{~s total:~B active:~B mon:~p}",
+    Fmt = io_lib:format(Msg, [StateName, Total, Active, Pid]),
     lists:flatten(Fmt);
 
 statefmt(State) ->
diff --git a/src/mem3/src/mem3_reshard.hrl b/src/mem3/src/mem3_reshard.hrl
index 1987806..049c209 100644
--- a/src/mem3/src/mem3_reshard.hrl
+++ b/src/mem3/src/mem3_reshard.hrl
@@ -52,9 +52,9 @@
     job_state :: job_state(),
     split_state :: split_state(),
     state_info = [] :: [{atom(), any()}],
-    state_history = [] :: [{atom(), time_sec()}],
-    time_started = 0 :: non_neg_integer(),
-    time_updated = 0 :: non_neg_integer(),
+    history = [] :: [{atom(), time_sec()}],
+    start_time = 0 :: non_neg_integer(),
+    update_time = 0 :: non_neg_integer(),
     node :: node(),
     pid :: undefined | pid() | '$1' | '_',
     ref :: undefined | reference() | '_',
@@ -67,7 +67,7 @@
 -record(state, {
     state :: shard_split_main_state(),
     state_info :: [],
-    time_updated :: non_neg_integer(),
+    update_time :: non_neg_integer(),
     job_prefix :: binary(),
     state_id :: binary(),
     db_monitor :: pid(),
diff --git a/src/mem3/src/mem3_reshard_httpd.erl b/src/mem3/src/mem3_reshard_httpd.erl
index 8ed4755..e840cb1 100644
--- a/src/mem3/src/mem3_reshard_httpd.erl
+++ b/src/mem3/src/mem3_reshard_httpd.erl
@@ -130,7 +130,7 @@ handle_reshard_req(#httpd{method = 'POST',
 handle_reshard_req(#httpd{path_parts=[_, ?JOBS]} = Req) ->
     send_method_not_allowed(Req, "GET,HEAD,POST");
 
-handle_reshard_req(#httpd{path_parts=[_, _]} = Req) ->
+handle_reshard_req(#httpd{path_parts=[_, _]}) ->
     throw(not_found);
 
 
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
index bd4d89a..d44e213 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -65,7 +65,7 @@ init([#job{} = Job0]) ->
     process_flag(trap_exit, true),
     Job = Job0#job{
         pid = self(),
-        time_started = mem3_reshard:now_sec(),
+        start_time = mem3_reshard:now_sec(),
         workers = [],
         retries = 0
     },
@@ -202,12 +202,12 @@ switch_state(#job{manager = ManagerPid} = Job0, NewState) ->
     Info2 = info_delete(reason, Info1),
     Job = Job0#job{
         split_state = NewState,
-        time_updated = mem3_reshard:now_sec(),
+        update_time = mem3_reshard:now_sec(),
         retries = 0,
         state_info = Info2,
         workers = []
     },
-    Job1 = update_split_state_history(Job),
+    Job1 = update_split_history(Job),
     ok = mem3_reshard:checkpoint(ManagerPid, check_state(Job1)),
     gen_server:cast(self(), do_state),
     Job1.
@@ -273,7 +273,7 @@ maybe_retry(#job{} = Job, _, Error) ->
 
 -spec report(#job{}) -> #job{}.
 report(#job{manager = ManagerPid} = Job) ->
-    Job1 = Job#job{time_updated = mem3_reshard:now_sec()},
+    Job1 = Job#job{update_time = mem3_reshard:now_sec()},
     ok = mem3_reshard:report(ManagerPid, Job1),
     Job1.
 
@@ -504,7 +504,14 @@ reset_targets(#job{source = Source, targets = Targets} = Job) ->
     Job.
 
 
--spec update_split_state_history(#job{}) -> #job{}.
-update_split_state_history(#job{split_state = St, time_updated = Ts} = Job) ->
-    Hist = Job#job.state_history,
-    Job#job{state_history = mem3_reshard:update_history(St, Ts, Hist)}.
+-spec update_split_history(#job{}) -> #job{}.
+update_split_history(#job{split_state = St, update_time = Ts} = Job) ->
+    Hist = Job#job.history,
+    JobSt = case St of
+        completed -> completed;
+        failed -> failed;
+        new -> new;
+        stopped -> stopped;
+        _ -> running
+    end,
+    Job#job{history = mem3_reshard:update_history(JobSt, St, Ts, Hist)}.
diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl
index c6ea945..f2f2e9e 100644
--- a/src/mem3/src/mem3_reshard_store.erl
+++ b/src/mem3/src/mem3_reshard_store.erl
@@ -179,14 +179,14 @@ job_to_ejson_props(#job{} = Job) ->
 
 job_to_ejson_props(#job{source = Source, targets = Targets} = Job, Opts) ->
     Iso8601 = proplists:get_value(iso8601, Opts),
-    History = state_history_to_ejson(Job#job.state_history, Iso8601),
-    TimeStarted = case Iso8601 of
-        true -> unix_to_iso8601(Job#job.time_started);
-        _ -> Job#job.time_started
+    History = history_to_ejson(Job#job.history, Iso8601),
+    StartTime = case Iso8601 of
+        true -> iso8601(Job#job.start_time);
+        _ -> Job#job.start_time
     end,
-    TimeUpdated = case Iso8601 of
-        true -> unix_to_iso8601(Job#job.time_updated);
-        _ -> Job#job.time_updated
+    UpdateTime = case Iso8601 of
+        true -> iso8601(Job#job.update_time);
+        _ -> Job#job.update_time
     end,
     [
         {id, Job#job.id},
@@ -197,9 +197,9 @@ job_to_ejson_props(#job{source = Source, targets = Targets} = Job, Opts) ->
         {split_state, Job#job.split_state},
         {state_info, state_info_to_ejson(Job#job.state_info)},
         {node, atom_to_binary(Job#job.node, utf8)},
-        {time_started, TimeStarted},
-        {time_updated, TimeUpdated},
-        {state_history, History}
+        {start_time, StartTime},
+        {update_time, UpdateTime},
+        {history, History}
     ].
 
 
@@ -211,9 +211,9 @@ job_from_ejson({Props}) ->
     JobState = couch_util:get_value(<<"job_state">>, Props),
     SplitState = couch_util:get_value(<<"split_state">>, Props),
     StateInfo = couch_util:get_value(<<"state_info">>, Props),
-    TStarted = couch_util:get_value(<<"time_started">>, Props),
-    TUpdated = couch_util:get_value(<<"time_updated">>, Props),
-    History = couch_util:get_value(<<"state_history">>, Props),
+    TStarted = couch_util:get_value(<<"start_time">>, Props),
+    TUpdated = couch_util:get_value(<<"update_time">>, Props),
+    History = couch_util:get_value(<<"history">>, Props),
     #job{
         id = Id,
         type = binary_to_atom(Type, utf8),
@@ -221,11 +221,11 @@ job_from_ejson({Props}) ->
         split_state = binary_to_atom(SplitState, utf8),
         state_info = state_info_from_ejson(StateInfo),
         node = node(),
-        time_started = TStarted,
-        time_updated = TUpdated,
+        start_time = TStarted,
+        update_time = TUpdated,
         source = mem3_reshard:shard_from_name(Source),
         targets = [mem3_reshard:shard_from_name(T) || T <- Targets],
-        state_history = state_history_from_ejson(History)
+        history = history_from_ejson(History)
     }.
 
 
@@ -233,7 +233,7 @@ state_to_ejson_props(#state{} = State) ->
     [
         {state, atom_to_binary(State#state.state, utf8)},
         {state_info, state_info_to_ejson(State#state.state_info)},
-        {time_updated, State#state.time_updated},
+        {update_time, State#state.update_time},
         {node, atom_to_binary(State#state.node, utf8)}
     ].
 
@@ -241,12 +241,12 @@ state_to_ejson_props(#state{} = State) ->
 state_from_ejson(#state{} = State, {Props}) ->
     StateVal = couch_util:get_value(<<"state">>, Props),
     StateInfo = couch_util:get_value(<<"state_info">>, Props),
-    TUpdated = couch_util:get_value(<<"time_updated">>, Props),
+    TUpdated = couch_util:get_value(<<"update_time">>, Props),
     State#state{
         state = binary_to_atom(StateVal, utf8),
         state_info = state_info_from_ejson(StateInfo),
         node = node(),
-        time_updated = TUpdated
+        update_time = TUpdated
     }.
 
 
@@ -256,19 +256,19 @@ state_info_from_ejson({Props}) ->
     lists:sort(Props1).
 
 
-state_history_to_ejson(History, true) when is_list(History) ->
-    [{[{state, S}, {ts, unix_to_iso8601(T)}]} || {S, T} <- History];
+history_to_ejson(Hist, true) when is_list(Hist) ->
+    [{[{timestamp, iso8601(T)}, {type, S}, {detail, D}]} || {S, D, T} <- Hist];
 
-state_history_to_ejson(History, _) when is_list(History) ->
-    [{[{state, S}, {ts, T}]} || {S, T} <- History].
+history_to_ejson(Hist, _) when is_list(Hist) ->
+    [{[{timestamp, T}, {type, S}, {detail, D}]} || {S, D, T} <- Hist].
 
 
-
-state_history_from_ejson(HistoryEJson) when is_list(HistoryEJson) ->
+history_from_ejson(HistoryEJson) when is_list(HistoryEJson) ->
     lists:map(fun({EventProps}) ->
-        State = couch_util:get_value(<<"state">>, EventProps),
-        Timestamp = couch_util:get_value(<<"ts">>, EventProps),
-        {binary_to_atom(State, utf8), Timestamp}
+        Timestamp = couch_util:get_value(<<"timestamp">>, EventProps),
+        State = couch_util:get_value(<<"type">>, EventProps),
+        Detail = couch_util:get_value(<<"detail">>, EventProps),
+        {Timestamp, binary_to_atom(State, utf8), Detail}
     end, HistoryEJson).
 
 
@@ -280,7 +280,7 @@ store_state() ->
     config:get_boolean("mem3_reshard", "store_state", true).
 
 
-unix_to_iso8601(UnixSec) ->
+iso8601(UnixSec) ->
     Mega = UnixSec div 1000000,
     Sec = UnixSec rem 1000000,
     {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time({Mega, Sec, 0}),