You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by to...@apache.org on 2020/09/10 18:34:06 UTC
[couchdb] 01/01: move remonitor code into DOWN message
This is an automated email from the ASF dual-hosted git repository.
tonysun83 pushed a commit to branch re-monitor-compaction-pid
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 1492947b26819df43f0e33517be7792eee4f9699
Author: Tony Sun <to...@gmail.com>
AuthorDate: Thu Sep 10 11:19:32 2020 -0700
move remonitor code into DOWN message
Smoosh monitors the compactor pid to determine when the compaction jobs
finishes, and uses this for its idea of concurrency. However, this isn't
accurate in the case where the compaction job has to re-spawn to catch up on
intervening changes since the same logical compaction job continues with
another pid and smoosh is not aware. In such cases, a smoosh channel with
concurrency one can start arbitrarily many additional database compaction jobs.
To solve this problem, we added a check to see if a compaction PID exists for
a db in `start_compact`. But this is the wrong approach because it’s for a
shard that comes off the queue. So it’s a different shard and the following
can occur:
1. Enqueue a bunch of stuff into channel with concurrency 1
2. Begin highest priority job, Shard1, in channel
3. Compaction finishes, discovers compaction file is behind main file
4. Smoosh-monitored PID for Shard1 exits, a new one starts to finish the job
5. Smoosh receives the 'DOWN' message, begins the next highest priority job,
Shard2
6. Channel concurrency is now 2, not 1
This change moves the check into the 'DOWN' message so that we can check for
that specific shard. If the compaction PID exists then it means a new process
was spawned and we just monitor that one and add it back to the queue. The
length of the queue does not change and therefore we won’t spawn new
compaction jobs.
---
src/smoosh/src/smoosh_channel.erl | 55 +++++++++++++++++++++++----------------
1 file changed, 33 insertions(+), 22 deletions(-)
diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl
index d8a8d14..37d86ee 100644
--- a/src/smoosh/src/smoosh_channel.erl
+++ b/src/smoosh/src/smoosh_channel.erl
@@ -122,17 +122,18 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
#state{active=Active0, starting=Starting0} = State,
case lists:keytake(Job, 2, Active0) of
{value, {Key, _Pid}, Active1} ->
- couch_log:warning("exit for compaction of ~p: ~p", [
- smoosh_utils:stringify(Key), Reason]),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
- {noreply, maybe_start_compaction(State#state{active=Active1})};
+ State1 = maybe_remonitor_cpid(State#state{active=Active1}, Key,
+ Reason),
+ {noreply, maybe_start_compaction(State1)};
false ->
case lists:keytake(Ref, 1, Starting0) of
{value, {_, Key}, Starting1} ->
- couch_log:warning("failed to start compaction of ~p: ~p", [
- smoosh_utils:stringify(Key), Reason]),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
- {noreply, maybe_start_compaction(State#state{starting=Starting1})};
+ couch_log:warning("failed to start compaction of ~p: ~p",
+ [smoosh_utils:stringify(Key), Reason]),
+ {ok, _} = timer:apply_after(5000, smoosh_server, enqueue,
+ [Key]),
+ {noreply,
+ maybe_start_compaction(State#state{starting=Starting1})};
false ->
{noreply, State}
end
@@ -275,24 +276,34 @@ start_compact(State, Db) ->
case smoosh_utils:ignore_db(Db) of
false ->
DbPid = couch_db:get_pid(Db),
- Key = couch_db:name(Db),
- case couch_db:get_compactor_pid(Db) of
- nil ->
- Ref = erlang:monitor(process, DbPid),
- DbPid ! {'$gen_call', {self(), Ref}, start_compact},
- State#state{starting=[{Ref, Key}|State#state.starting]};
- % database is still compacting so we can just monitor the existing
- % compaction pid
- CPid ->
- couch_log:notice("Db ~s continuing compaction",
- [smoosh_utils:stringify(Key)]),
- erlang:monitor(process, CPid),
- State#state{active=[{Key, CPid}|State#state.active]}
- end;
+ Ref = erlang:monitor(process, DbPid),
+ DbPid ! {'$gen_call', {self(), Ref}, start_compact},
+ State#state{starting=[{Ref, couch_db:name(Db)}|State#state.starting]};
_ ->
false
end.
+maybe_remonitor_cpid(State, DbName, Reason) when is_binary(DbName) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ case couch_db:get_compactor_pid(Db) of
+ nil ->
+ couch_log:warning("exit for compaction of ~p: ~p",
+ [smoosh_utils:stringify(DbName), Reason]),
+ {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [DbName]),
+ State;
+ CPid ->
+ couch_log:notice("Db ~s continuing compaction",
+ [smoosh_utils:stringify(DbName)]),
+ erlang:monitor(process, CPid),
+ State#state{active=[{DbName, CPid}|State#state.active]}
+ end;
+% not a database compaction, so ignore the pid check
+maybe_remonitor_cpid(State, Key, Reason) ->
+ couch_log:warning("exit for compaction of ~p: ~p",
+ [smoosh_utils:stringify(Key), Reason]),
+ {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
+ State.
+
schedule_unpause() ->
WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")),
erlang:send_after(WaitSecs * 1000, self(), unpause).