You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/02/05 16:24:01 UTC
[couchdb] branch shard-split updated: Make _reshard/jobs closer to
_scheduler/jobs
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch shard-split
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/shard-split by this push:
new 5772d4a Make _reshard/jobs closer to _scheduler/jobs
5772d4a is described below
commit 5772d4aa6a49e10670ed5d8cefc0ad1349d18f0b
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Feb 5 11:22:44 2019 -0500
Make _reshard/jobs closer to _scheduler/jobs
Use type and timestamp instead of state and ts
Make the history field called "history".
---
src/mem3/src/mem3_reshard.erl | 104 +++++++++++++++++++++---------------
src/mem3/src/mem3_reshard.hrl | 8 +--
src/mem3/src/mem3_reshard_httpd.erl | 2 +-
src/mem3/src/mem3_reshard_job.erl | 23 +++++---
src/mem3/src/mem3_reshard_store.erl | 58 ++++++++++----------
5 files changed, 110 insertions(+), 85 deletions(-)
diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl
index 98b0fbe..9e83b94 100644
--- a/src/mem3/src/mem3_reshard.erl
+++ b/src/mem3/src/mem3_reshard.erl
@@ -30,7 +30,7 @@
reset_state/0,
db_monitor/1,
now_sec/0,
- update_history/3,
+ update_history/4,
start_link/0,
@@ -76,13 +76,15 @@ start_split_job(#shard{} = Source, Split) ->
type = split,
job_state = new,
split_state = new,
- state_history = [{new, TStamp}],
+ start_time = TStamp,
+ update_time = TStamp,
node = node(),
source = Source,
targets = Targets
},
Job1 = Job#job{id = job_id(Job)},
- gen_server:call(?MODULE, {start_job, Job1}, infinity);
+ Job2 = update_job_history(Job1),
+ gen_server:call(?MODULE, {start_job, Job2}, infinity);
{error, Error} ->
{error, Error}
end;
@@ -170,7 +172,7 @@ init(_) ->
State = #state{
state = running,
state_info = [],
- time_updated = now_sec(),
+ update_time = now_sec(),
node = node(),
db_monitor = spawn_link(?MODULE, db_monitor, [self()])
},
@@ -191,7 +193,7 @@ terminate(Reason, State) ->
handle_call(start, _From, #state{state = stopped} = State) ->
State1 = State#state{
state = running,
- time_updated = now_sec(),
+ update_time = now_sec(),
state_info = info_delete(reason, State#state.state_info)
},
ok = mem3_reshard_store:store_state(State1),
@@ -204,7 +206,7 @@ handle_call(start, _From, State) ->
handle_call({stop, Reason}, _From, #state{state = running} = State) ->
State1 = State#state{
state = stopped,
- time_updated = now_sec(),
+ update_time = now_sec(),
state_info = info_update(reason, Reason, State#state.state_info)
},
ok = mem3_reshard_store:store_state(State1),
@@ -381,13 +383,13 @@ reload_job(#job{job_state = running} = Job, #state{state = stopped} = State) ->
OldInfo = Job#job.state_info,
Job1 = Job#job{
job_state = stopped,
- time_updated = now_sec(),
- time_started = 0,
+ update_time = now_sec(),
+ start_time = 0,
state_info = info_update(reason, <<"Shard splitting disabled">>, OldInfo),
pid = undefined,
ref = undefined
},
- true = ets:insert(?MODULE, update_job_state_history(Job1)),
+ true = ets:insert(?MODULE, update_job_history(Job1)),
State;
% This is a case when a job process should be spawend
@@ -422,7 +424,7 @@ get_start_delay_sec() ->
start_job_int(Job, State) ->
case spawn_job(Job) of
{ok, #job{} = Job1} ->
- Job2 = update_job_state_history(Job1),
+ Job2 = update_job_history(Job1),
ok = mem3_reshard_store:store_job(State, Job2),
true = ets:insert(?MODULE, Job2),
ok;
@@ -434,8 +436,8 @@ start_job_int(Job, State) ->
spawn_job(#job{} = Job0) ->
Job = Job0#job{
job_state = running,
- time_started = 0,
- time_updated = now_sec(),
+ start_time = 0,
+ update_time = now_sec(),
state_info = info_delete(reason, Job0#job.state_info),
manager = self(),
workers = [],
@@ -459,7 +461,7 @@ stop_job_int(#job{} = Job, JobState, Reason, State) ->
Job1 = kill_job_int(Job),
Job2 = Job1#job{
job_state = JobState,
- time_updated = now_sec(),
+ update_time = now_sec(),
state_info = [{reason, Reason}]
},
ok = mem3_reshard_store:store_job(State, Job2),
@@ -492,10 +494,10 @@ handle_job_exit(#job{split_state = completed} = Job, normal, State) ->
pid = undefined,
ref = undefined,
job_state = completed,
- time_updated = now_sec(),
+ update_time = now_sec(),
state_info = []
},
- Job2 = update_job_state_history(Job1),
+ Job2 = update_job_history(Job1),
ok = mem3_reshard_store:store_job(State, Job2),
true = ets:insert(?MODULE, Job2),
ok;
@@ -507,10 +509,10 @@ handle_job_exit(#job{job_state = running} = Job, normal, _State) ->
pid = undefined,
ref = undefined,
job_state = stopped,
- time_updated = now_sec(),
+ update_time = now_sec(),
state_info = info_update(reason, <<"Job stopped">>, OldInfo)
},
- true = ets:insert(?MODULE, update_job_state_history(Job1)),
+ true = ets:insert(?MODULE, update_job_history(Job1)),
ok;
handle_job_exit(#job{job_state = running} = Job, shutdown, _State) ->
@@ -520,10 +522,10 @@ handle_job_exit(#job{job_state = running} = Job, shutdown, _State) ->
pid = undefined,
ref = undefined,
job_state = stopped,
- time_updated = now_sec(),
+ update_time = now_sec(),
state_info = info_update(reason, <<"Job shutdown">>, OldInfo)
},
- true = ets:insert(?MODULE, update_job_state_history(Job1)),
+ true = ets:insert(?MODULE, update_job_history(Job1)),
ok;
handle_job_exit(#job{job_state = running} = Job, {shutdown, Msg}, _State) ->
@@ -533,10 +535,10 @@ handle_job_exit(#job{job_state = running} = Job, {shutdown, Msg}, _State) ->
pid = undefined,
ref = undefined,
job_state = stopped,
- time_updated = now_sec(),
+ update_time = now_sec(),
state_info = info_update(reason, <<"Job shutdown">>, OldInfo)
},
- true = ets:insert(?MODULE, update_job_state_history(Job1)),
+ true = ets:insert(?MODULE, update_job_history(Job1)),
ok;
handle_job_exit(#job{} = Job, Error, State) ->
@@ -546,10 +548,10 @@ handle_job_exit(#job{} = Job, Error, State) ->
pid = undefined,
ref = undefined,
job_state = failed,
- time_updated = now_sec(),
+ update_time = now_sec(),
state_info = info_update(reason, Error, OldInfo)
},
- Job2 = update_job_state_history(Job1),
+ Job2 = update_job_history(Job1),
ok = mem3_reshard_store:store_job(State, Job2),
true = ets:insert(?MODULE, Job2),
ok.
@@ -730,27 +732,44 @@ reset_state(#state{} = State) ->
end, State, Jobs).
--spec iso8601(non_neg_integer()) -> string().
-iso8601(UnixSec) ->
- Base = calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}),
- GSec = Base + UnixSec,
- {{Y, Mon, D}, {H, Min, S}} = calendar:gregorian_seconds_to_datetime(GSec),
- Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
- lists:flatten(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
+-spec update_job_history(#job{}) -> #job{}.
+update_job_history(#job{job_state = St, update_time = Ts} = Job) ->
+ Hist = Job#job.history,
+ Reason = case couch_util:get_value(reason, Job#job.state_info) of
+ undefined -> null;
+ Val -> couch_util:to_binary(Val)
+ end,
+ Job#job{history = update_history(St, Reason, Ts, Hist)}.
+
+
+
+-spec update_history_rev(atom(), binary() | null, time_sec(), list()) -> list().
+update_history(State, State, Ts, History) ->
+ % State is the same as detail. Make the detail null to avoid duplication
+ update_history(State, null, Ts, History);
+update_history(State, Detail, Ts, History) ->
+ % Reverse, so we can process the last event as the head using
+ % head matches, then after append and trimming, reserse again
+ Rev = lists:reverse(History),
+ UpdatedRev = update_history_rev(State, Detail, Ts, Rev),
+ TrimmedRev = lists:sublist(UpdatedRev, max_history()),
+ lists:reverse(TrimmedRev).
--spec update_job_state_history(#job{}) -> #job{}.
-update_job_state_history(#job{job_state = St, time_updated = Ts} = Job) ->
- Hist = Job#job.state_history,
- Job#job{state_history = update_history(St, Ts, Hist)}.
+update_history_rev(State, null, Ts, [{State, Detail, _} | Rest]) ->
+ % Just updated the detail, state stays the same, no new entry added
+ [{State, Detail, Ts} | Rest];
+update_history_rev(State, Detail, Ts, [{State, Detail, _} | Rest]) ->
+ % State and detail were same as last event, just update the timestamp
+ [{State, Detail, Ts} | Rest];
--spec update_history(atom(), time_sec(), list()) -> list().
-update_history(State, Ts, [{State, _} | Rest]) ->
- lists:sublist([{State, Ts} | Rest], max_history());
+update_history_rev(State, Detail, Ts, [{State, Detail, _} | Rest]) ->
+ % State and detail were same as last event, just update the timestamp
+ [{State, Detail, Ts} | Rest];
-update_history(State, Ts, History) ->
- lists:sublist([{State, Ts} | History], max_history()).
+update_history_rev(State, Detail, Ts, History) ->
+ [{State, Detail, Ts} | History].
-spec max_history() -> non_neg_integer().
@@ -762,14 +781,13 @@ max_history() ->
statefmt(#state{} = State) ->
#state{
state = StateName,
- time_updated = Updated,
+ update_time = Updated,
db_monitor = Pid
} = State,
Total = ets:info(?MODULE, size),
Active = mem3_reshard_job_sup:count_children(),
- UpdatedFmt = iso8601(Updated),
- Msg = "#state{~s updated:~s total:~B active:~B mon:~p}",
- Fmt = io_lib:format(Msg, [StateName, UpdatedFmt, Total, Active, Pid]),
+ Msg = "#state{~s total:~B active:~B mon:~p}",
+ Fmt = io_lib:format(Msg, [StateName, Total, Active, Pid]),
lists:flatten(Fmt);
statefmt(State) ->
diff --git a/src/mem3/src/mem3_reshard.hrl b/src/mem3/src/mem3_reshard.hrl
index 1987806..049c209 100644
--- a/src/mem3/src/mem3_reshard.hrl
+++ b/src/mem3/src/mem3_reshard.hrl
@@ -52,9 +52,9 @@
job_state :: job_state(),
split_state :: split_state(),
state_info = [] :: [{atom(), any()}],
- state_history = [] :: [{atom(), time_sec()}],
- time_started = 0 :: non_neg_integer(),
- time_updated = 0 :: non_neg_integer(),
+ history = [] :: [{atom(), time_sec()}],
+ start_time = 0 :: non_neg_integer(),
+ update_time = 0 :: non_neg_integer(),
node :: node(),
pid :: undefined | pid() | '$1' | '_',
ref :: undefined | reference() | '_',
@@ -67,7 +67,7 @@
-record(state, {
state :: shard_split_main_state(),
state_info :: [],
- time_updated :: non_neg_integer(),
+ update_time :: non_neg_integer(),
job_prefix :: binary(),
state_id :: binary(),
db_monitor :: pid(),
diff --git a/src/mem3/src/mem3_reshard_httpd.erl b/src/mem3/src/mem3_reshard_httpd.erl
index 8ed4755..e840cb1 100644
--- a/src/mem3/src/mem3_reshard_httpd.erl
+++ b/src/mem3/src/mem3_reshard_httpd.erl
@@ -130,7 +130,7 @@ handle_reshard_req(#httpd{method = 'POST',
handle_reshard_req(#httpd{path_parts=[_, ?JOBS]} = Req) ->
send_method_not_allowed(Req, "GET,HEAD,POST");
-handle_reshard_req(#httpd{path_parts=[_, _]} = Req) ->
+handle_reshard_req(#httpd{path_parts=[_, _]}) ->
throw(not_found);
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
index bd4d89a..d44e213 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -65,7 +65,7 @@ init([#job{} = Job0]) ->
process_flag(trap_exit, true),
Job = Job0#job{
pid = self(),
- time_started = mem3_reshard:now_sec(),
+ start_time = mem3_reshard:now_sec(),
workers = [],
retries = 0
},
@@ -202,12 +202,12 @@ switch_state(#job{manager = ManagerPid} = Job0, NewState) ->
Info2 = info_delete(reason, Info1),
Job = Job0#job{
split_state = NewState,
- time_updated = mem3_reshard:now_sec(),
+ update_time = mem3_reshard:now_sec(),
retries = 0,
state_info = Info2,
workers = []
},
- Job1 = update_split_state_history(Job),
+ Job1 = update_split_history(Job),
ok = mem3_reshard:checkpoint(ManagerPid, check_state(Job1)),
gen_server:cast(self(), do_state),
Job1.
@@ -273,7 +273,7 @@ maybe_retry(#job{} = Job, _, Error) ->
-spec report(#job{}) -> #job{}.
report(#job{manager = ManagerPid} = Job) ->
- Job1 = Job#job{time_updated = mem3_reshard:now_sec()},
+ Job1 = Job#job{update_time = mem3_reshard:now_sec()},
ok = mem3_reshard:report(ManagerPid, Job1),
Job1.
@@ -504,7 +504,14 @@ reset_targets(#job{source = Source, targets = Targets} = Job) ->
Job.
--spec update_split_state_history(#job{}) -> #job{}.
-update_split_state_history(#job{split_state = St, time_updated = Ts} = Job) ->
- Hist = Job#job.state_history,
- Job#job{state_history = mem3_reshard:update_history(St, Ts, Hist)}.
+-spec update_split_history(#job{}) -> #job{}.
+update_split_history(#job{split_state = St, update_time = Ts} = Job) ->
+ Hist = Job#job.history,
+ JobSt = case St of
+ completed -> completed;
+ failed -> failed;
+ new -> new;
+ stopped -> stopped;
+ _ -> running
+ end,
+ Job#job{history = mem3_reshard:update_history(JobSt, St, Ts, Hist)}.
diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl
index c6ea945..f2f2e9e 100644
--- a/src/mem3/src/mem3_reshard_store.erl
+++ b/src/mem3/src/mem3_reshard_store.erl
@@ -179,14 +179,14 @@ job_to_ejson_props(#job{} = Job) ->
job_to_ejson_props(#job{source = Source, targets = Targets} = Job, Opts) ->
Iso8601 = proplists:get_value(iso8601, Opts),
- History = state_history_to_ejson(Job#job.state_history, Iso8601),
- TimeStarted = case Iso8601 of
- true -> unix_to_iso8601(Job#job.time_started);
- _ -> Job#job.time_started
+ History = history_to_ejson(Job#job.history, Iso8601),
+ StartTime = case Iso8601 of
+ true -> iso8601(Job#job.start_time);
+ _ -> Job#job.start_time
end,
- TimeUpdated = case Iso8601 of
- true -> unix_to_iso8601(Job#job.time_updated);
- _ -> Job#job.time_updated
+ UpdateTime = case Iso8601 of
+ true -> iso8601(Job#job.update_time);
+ _ -> Job#job.update_time
end,
[
{id, Job#job.id},
@@ -197,9 +197,9 @@ job_to_ejson_props(#job{source = Source, targets = Targets} = Job, Opts) ->
{split_state, Job#job.split_state},
{state_info, state_info_to_ejson(Job#job.state_info)},
{node, atom_to_binary(Job#job.node, utf8)},
- {time_started, TimeStarted},
- {time_updated, TimeUpdated},
- {state_history, History}
+ {start_time, StartTime},
+ {update_time, UpdateTime},
+ {history, History}
].
@@ -211,9 +211,9 @@ job_from_ejson({Props}) ->
JobState = couch_util:get_value(<<"job_state">>, Props),
SplitState = couch_util:get_value(<<"split_state">>, Props),
StateInfo = couch_util:get_value(<<"state_info">>, Props),
- TStarted = couch_util:get_value(<<"time_started">>, Props),
- TUpdated = couch_util:get_value(<<"time_updated">>, Props),
- History = couch_util:get_value(<<"state_history">>, Props),
+ TStarted = couch_util:get_value(<<"start_time">>, Props),
+ TUpdated = couch_util:get_value(<<"update_time">>, Props),
+ History = couch_util:get_value(<<"history">>, Props),
#job{
id = Id,
type = binary_to_atom(Type, utf8),
@@ -221,11 +221,11 @@ job_from_ejson({Props}) ->
split_state = binary_to_atom(SplitState, utf8),
state_info = state_info_from_ejson(StateInfo),
node = node(),
- time_started = TStarted,
- time_updated = TUpdated,
+ start_time = TStarted,
+ update_time = TUpdated,
source = mem3_reshard:shard_from_name(Source),
targets = [mem3_reshard:shard_from_name(T) || T <- Targets],
- state_history = state_history_from_ejson(History)
+ history = history_from_ejson(History)
}.
@@ -233,7 +233,7 @@ state_to_ejson_props(#state{} = State) ->
[
{state, atom_to_binary(State#state.state, utf8)},
{state_info, state_info_to_ejson(State#state.state_info)},
- {time_updated, State#state.time_updated},
+ {update_time, State#state.update_time},
{node, atom_to_binary(State#state.node, utf8)}
].
@@ -241,12 +241,12 @@ state_to_ejson_props(#state{} = State) ->
state_from_ejson(#state{} = State, {Props}) ->
StateVal = couch_util:get_value(<<"state">>, Props),
StateInfo = couch_util:get_value(<<"state_info">>, Props),
- TUpdated = couch_util:get_value(<<"time_updated">>, Props),
+ TUpdated = couch_util:get_value(<<"update_time">>, Props),
State#state{
state = binary_to_atom(StateVal, utf8),
state_info = state_info_from_ejson(StateInfo),
node = node(),
- time_updated = TUpdated
+ update_time = TUpdated
}.
@@ -256,19 +256,19 @@ state_info_from_ejson({Props}) ->
lists:sort(Props1).
-state_history_to_ejson(History, true) when is_list(History) ->
- [{[{state, S}, {ts, unix_to_iso8601(T)}]} || {S, T} <- History];
+history_to_ejson(Hist, true) when is_list(Hist) ->
+ [{[{timestamp, iso8601(T)}, {type, S}, {detail, D}]} || {S, D, T} <- Hist];
-state_history_to_ejson(History, _) when is_list(History) ->
- [{[{state, S}, {ts, T}]} || {S, T} <- History].
+history_to_ejson(Hist, _) when is_list(Hist) ->
+ [{[{timestamp, T}, {type, S}, {detail, D}]} || {S, D, T} <- Hist].
-
-state_history_from_ejson(HistoryEJson) when is_list(HistoryEJson) ->
+history_from_ejson(HistoryEJson) when is_list(HistoryEJson) ->
lists:map(fun({EventProps}) ->
- State = couch_util:get_value(<<"state">>, EventProps),
- Timestamp = couch_util:get_value(<<"ts">>, EventProps),
- {binary_to_atom(State, utf8), Timestamp}
+ Timestamp = couch_util:get_value(<<"timestamp">>, EventProps),
+ State = couch_util:get_value(<<"type">>, EventProps),
+ Detail = couch_util:get_value(<<"detail">>, EventProps),
+ {Timestamp, binary_to_atom(State, utf8), Detail}
end, HistoryEJson).
@@ -280,7 +280,7 @@ store_state() ->
config:get_boolean("mem3_reshard", "store_state", true).
-unix_to_iso8601(UnixSec) ->
+iso8601(UnixSec) ->
Mega = UnixSec div 1000000,
Sec = UnixSec rem 1000000,
{{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time({Mega, Sec, 0}),