You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/27 18:17:46 UTC
[1/2] mem3 commit: updated
refs/heads/COUCHDB-3287-pluggable-storage-engines to 5a1d85b [Forced Update!]
Repository: couchdb-mem3
Updated Branches:
refs/heads/COUCHDB-3287-pluggable-storage-engines aae55e889 -> 5a1d85bce (forced update)
Fix stale shards cache
There's a race condition in mem3_shards that can result in having shards
in the cache for a database that's been deleted. This results in a
confused cluster that thinks a database exists until you attempt to open
it.
The fix is to ignore any cache insert requests that come from an older
version of the dbs db than mem3_shards cache knows about.
Big thanks to @jdoane for the identification and original patch.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/55263215
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/55263215
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/55263215
Branch: refs/heads/COUCHDB-3287-pluggable-storage-engines
Commit: 552632156416883c5382d8662251f22c16bd3ece
Parents: 58aadeb
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Feb 24 12:55:37 2017 -0600
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Mar 23 08:43:43 2017 -0500
----------------------------------------------------------------------
src/mem3_shards.erl | 54 ++++++++++++++++++++++++++++++++++++++----------
1 file changed, 43 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/55263215/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index accede9..22eb6de 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -27,7 +27,8 @@
-record(st, {
max_size = 25000,
cur_size = 0,
- changes_pid
+ changes_pid,
+ update_seq
}).
-include_lib("mem3/include/mem3.hrl").
@@ -181,11 +182,12 @@ init([]) ->
ets:new(?ATIMES, [ordered_set, protected, named_table]),
ok = config:listen_for_changes(?MODULE, nil),
SizeList = config:get("mem3", "shard_cache_size", "25000"),
- {Pid, _} = spawn_monitor(fun() -> listen_for_changes(get_update_seq()) end),
+ UpdateSeq = get_update_seq(),
{ok, #st{
max_size = list_to_integer(SizeList),
cur_size = 0,
- changes_pid = Pid
+ changes_pid = start_changes_listener(UpdateSeq),
+ update_seq = UpdateSeq
}}.
handle_call({set_max_size, Size}, _From, St) ->
@@ -200,12 +202,23 @@ handle_cast({cache_hit, DbName}, St) ->
couch_stats:increment_counter([mem3, shard_cache, hit]),
cache_hit(DbName),
{noreply, St};
-handle_cast({cache_insert, DbName, Shards}, St) ->
+handle_cast({cache_insert, DbName, Shards, UpdateSeq}, St) ->
couch_stats:increment_counter([mem3, shard_cache, miss]),
- {noreply, cache_free(cache_insert(St, DbName, Shards))};
+ NewSt = case UpdateSeq < St#st.update_seq of
+ true -> St;
+ false -> cache_free(cache_insert(St, DbName, Shards))
+ end,
+ {noreply, NewSt};
handle_cast({cache_remove, DbName}, St) ->
couch_stats:increment_counter([mem3, shard_cache, eviction]),
{noreply, cache_remove(St, DbName)};
+handle_cast({cache_insert_change, DbName, Shards, UpdateSeq}, St) ->
+ Msg = {cache_insert, DbName, Shards, UpdateSeq},
+ {noreply, NewSt} = handle_cast(Msg, St),
+ {noreply, NewSt#st{update_seq = UpdateSeq}};
+handle_cast({cache_remove_change, DbName, UpdateSeq}, St) ->
+ {noreply, NewSt} = handle_cast({cache_remove, DbName}, St),
+ {noreply, NewSt#st{update_seq = UpdateSeq}};
handle_cast(_Msg, St) ->
{noreply, St}.
@@ -222,8 +235,9 @@ handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid}=St) ->
erlang:send_after(5000, self(), {start_listener, Seq}),
{noreply, NewSt#st{changes_pid=undefined}};
handle_info({start_listener, Seq}, St) ->
- {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
- {noreply, St#st{changes_pid=NewPid}};
+ {noreply, St#st{
+ changes_pid = start_changes_listener(Seq)
+ }};
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, nil),
{noreply, State};
@@ -239,6 +253,21 @@ code_change(_OldVsn, #st{}=St, _Extra) ->
%% internal functions
+start_changes_listener(SinceSeq) ->
+ Self = self(),
+ {Pid, _} = erlang:spawn_monitor(fun() ->
+ erlang:spawn_link(fun() ->
+ Ref = erlang:monitor(process, Self),
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ end,
+ exit(shutdown)
+ end),
+ listen_for_changes(SinceSeq)
+ end),
+ Pid.
+
fold_fun(#full_doc_info{}=FDI, Acc) ->
DI = couch_doc:to_doc_info(FDI),
fold_fun(DI, Acc);
@@ -278,10 +307,11 @@ changes_callback({stop, EndSeq}, _) ->
exit({seq, EndSeq});
changes_callback({change, {Change}, _}, _) ->
DbName = couch_util:get_value(<<"id">>, Change),
+ Seq = couch_util:get_value(<<"seq">>, Change),
case DbName of <<"_design/", _/binary>> -> ok; _Else ->
case mem3_util:is_deleted(Change) of
true ->
- gen_server:cast(?MODULE, {cache_remove, DbName});
+ gen_server:cast(?MODULE, {cache_remove_change, DbName, Seq});
false ->
case couch_util:get_value(doc, Change) of
{error, Reason} ->
@@ -289,13 +319,14 @@ changes_callback({change, {Change}, _}, _) ->
[DbName, Reason]);
{Doc} ->
Shards = mem3_util:build_ordered_shards(DbName, Doc),
- gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
+ Msg = {cache_insert_change, DbName, Shards, Seq},
+ gen_server:cast(?MODULE, Msg),
[create_if_missing(mem3:name(S), mem3:engine(S)) || S
<- Shards, mem3:node(S) =:= node()]
end
end
end,
- {ok, couch_util:get_value(<<"seq">>, Change)};
+ {ok, Seq};
changes_callback(timeout, _) ->
ok.
@@ -311,8 +342,9 @@ load_shards_from_disk(DbName) when is_binary(DbName) ->
load_shards_from_db(ShardDb, DbName) ->
case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of
{ok, #doc{body = {Props}}} ->
+ Seq = couch_db:get_update_seq(ShardDb),
Shards = mem3_util:build_ordered_shards(DbName, Props),
- gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
+ gen_server:cast(?MODULE, {cache_insert, DbName, Shards, Seq}),
Shards;
{not_found, _} ->
erlang:error(database_does_not_exist, ?b2l(DbName))
[2/2] mem3 commit: updated
refs/heads/COUCHDB-3287-pluggable-storage-engines to 5a1d85b
Posted by da...@apache.org.
Avoid overloading mem3_shards
This attempts to prevent overload on mem3_shards by skipping cache
insertions when the mem3_shards gen_server has a mailbox with more than
five thousand messages.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/5a1d85bc
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/5a1d85bc
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/5a1d85bc
Branch: refs/heads/COUCHDB-3287-pluggable-storage-engines
Commit: 5a1d85bce7ef6f5330c02e1525ea52496c967011
Parents: 5526321
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Mar 27 13:16:05 2017 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Mon Mar 27 13:16:05 2017 -0500
----------------------------------------------------------------------
src/mem3_shards.erl | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/5a1d85bc/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index 22eb6de..329f9ed 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -344,7 +344,12 @@ load_shards_from_db(ShardDb, DbName) ->
{ok, #doc{body = {Props}}} ->
Seq = couch_db:get_update_seq(ShardDb),
Shards = mem3_util:build_ordered_shards(DbName, Props),
- gen_server:cast(?MODULE, {cache_insert, DbName, Shards, Seq}),
+ case erlang:process_info(whereis(?MODULE), message_queue_len) of
+ {_, N} when is_integer(N), N < 5000 ->
+ gen_server:cast(?MODULE, {cache_insert, DbName, Shards, Seq});
+ _ ->
+ ok
+ end,
Shards;
{not_found, _} ->
erlang:error(database_does_not_exist, ?b2l(DbName))