You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2016/05/09 21:29:19 UTC

[2/4] mem3 commit: updated refs/heads/master to 0b70afb

Reduce frequency of mem3_sync:push/2 calls

In high-throughput scenarios on databases with large q values the
mem3_sync event listener becomes overloaded with messages due to the
poor performance of the shard selection logic.

It's not strictly necessary to sync on every update, but we do need to
be careful not to lose updates by keeping history too naively. This
patch adds a configurable delay and push frequencyto reduce pressure on
the mem3_sync event listener.

COUCHDB-2984


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/d5e0a4a1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/d5e0a4a1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/d5e0a4a1

Branch: refs/heads/master
Commit: d5e0a4a19de99b2c6a91c9de8a1bc120664e36d5
Parents: d3ce227
Author: Benjamin Anderson <b...@banjiewen.net>
Authored: Sat Apr 9 22:44:58 2016 -0700
Committer: Benjamin Anderson <b...@banjiewen.net>
Committed: Mon May 9 14:20:56 2016 -0700

----------------------------------------------------------------------
 src/mem3_sync_event_listener.erl | 176 ++++++++++++++++++++++++++++++----
 1 file changed, 155 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/d5e0a4a1/src/mem3_sync_event_listener.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync_event_listener.erl b/src/mem3_sync_event_listener.erl
index 7059347..d74c21f 100644
--- a/src/mem3_sync_event_listener.erl
+++ b/src/mem3_sync_event_listener.erl
@@ -12,6 +12,7 @@
 
 -module(mem3_sync_event_listener).
 -behavior(couch_event_listener).
+-behavior(config_listener).
 
 -export([
     start_link/0
@@ -25,24 +26,52 @@
     handle_info/2
 ]).
 
+-export([
+    handle_config_change/5,
+    handle_config_terminate/3
+]).
+
 -include_lib("mem3/include/mem3.hrl").
 
 -record(state, {
     nodes,
     shards,
-    users
+    users,
+    delay,
+    frequency,
+    last_push,
+    buckets
 }).
 
+%% Calling mem3_sync:push/2 on every update has a measurable performance cost,
+%% so we'd like to coalesce multiple update messages from couch_event in to a
+%% single push call. Doing this while ensuring both correctness (i.e., no lost
+%% updates) and an even load profile is somewhat subtle. This implementation
+%% groups updated shards in a list of "buckets" (see bucket_shard/2) and
+%% guarantees that each shard is in no more than one bucket at a time - i.e.,
+%% any update messages received before the shard's current bucket has been
+%% pushed will be ignored - thereby reducing the frequency with which a single
+%% shard will be pushed. mem3_sync:push/2 is called on all shards in the
+%% *oldest* bucket roughly every mem3.sync_frequency milliseconds (see
+%% maybe_push_shards/1) to even out the load on mem3_sync.
+
 start_link() ->
     couch_event_listener:start_link(?MODULE, [], [all_dbs]).
 
 init(_) ->
-    State = #state{
+    config:listen_for_changes(?MODULE, undefined),
+    Delay = config:get_integer("mem3", "sync_delay", 5000),
+    Frequency = config:get_integer("mem3", "sync_frequency", 500),
+    Buckets = lists:duplicate(Delay div Frequency + 1, sets:new()),
+    St = #state{
         nodes = mem3_sync:nodes_db(),
         shards = mem3_sync:shards_db(),
-        users = mem3_sync:users_db()
+        users = mem3_sync:users_db(),
+        delay = Delay,
+        frequency = Frequency,
+        buckets = Buckets
     },
-    {ok, State}.
+    {ok, St}.
 
 terminate(_Reason, _State) ->
     ok.
@@ -51,35 +80,140 @@ handle_event(NodesDb, updated, #state{nodes = NodesDb} = St) ->
     Nodes = mem3:nodes(),
     Live = nodes(),
     [mem3_sync:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)],
-    {ok, St};
+    maybe_push_shards(St);
 handle_event(ShardsDb, updated, #state{shards = ShardsDb} = St) ->
     mem3_sync:push(ShardsDb, mem3_sync:find_next_node()),
-    {ok, St};
+    maybe_push_shards(St);
 handle_event(UsersDb, updated, #state{users = UsersDb} = St) ->
     mem3_sync:push(UsersDb, mem3_sync:find_next_node()),
-    {ok, St};
+    maybe_push_shards(St);
 handle_event(<<"shards/", _/binary>> = ShardName, updated, St) ->
-    try mem3:shards(mem3:dbname(ShardName)) of
-    Shards ->
-        Targets = [S || #shard{node=N, name=Name} = S <- Shards,
-            N =/= node(), Name =:= ShardName],
-        Live = nodes(),
-        [mem3_sync:push(ShardName,N) || #shard{node=N} <- Targets,
-            lists:member(N, Live)]
-    catch error:database_does_not_exist ->
-        ok
-    end,
-    {ok, St};
+    Buckets = bucket_shard(ShardName, St#state.buckets),
+    maybe_push_shards(St#state{buckets=Buckets});
 handle_event(<<"shards/", _:18/binary, _/binary>> = ShardName, deleted, St) ->
     mem3_sync:remove_shard(ShardName),
-    {ok, St};
+    maybe_push_shards(St);
 handle_event(_DbName, _Event, St) ->
-    {ok, St}.
+    maybe_push_shards(St).
 
+handle_cast({set_frequency, Frequency}, St) ->
+    #state{delay = Delay, buckets = Buckets0} = St,
+    Buckets1 = rebucket_shards(Delay, Frequency, Buckets0),
+    maybe_push_shards(St#state{frequency=Frequency, buckets=Buckets1});
+handle_cast({set_delay, Delay}, St) ->
+    #state{frequency = Frequency, buckets = Buckets0} = St,
+    Buckets1 = rebucket_shards(Delay, Frequency, Buckets0),
+    maybe_push_shards(St#state{delay=Delay, buckets=Buckets1});
 handle_cast(Msg, St) ->
     couch_log:notice("unexpected cast to mem3_sync_event_listener: ~p", [Msg]),
-    {ok, St}.
+    maybe_push_shards(St).
 
+handle_info(timeout, St) ->
+    maybe_push_shards(St);
 handle_info(Msg, St) ->
     couch_log:notice("unexpected info to mem3_sync_event_listener: ~p", [Msg]),
+    maybe_push_shards(St).
+
+handle_config_change("mem3", "sync_delay", Delay0, _, St) ->
+    try list_to_integer(Delay0) of
+        Delay1 ->
+            couch_event_listener:cast(
+                ?MODULE,
+                {set_delay, Delay1}
+            )
+    catch error:badarg ->
+        couch_log:warning(
+            "ignoring bad value for mem3.sync_delay: ~p",
+            [Delay0]
+        )
+    end,
+    {ok, St};
+handle_config_change("mem3", "sync_frequency", Frequency0, _, St) ->
+    try list_to_integer(Frequency0) of
+        Frequency1 ->
+            couch_event_listener:cast(
+                ?MODULE,
+                {set_frequency, Frequency1}
+            )
+    catch error:badarg ->
+        couch_log:warning(
+            "ignoring bad value for mem3.sync_frequency: ~p",
+            [Frequency0]
+        )
+    end,
+    {ok, St};
+handle_config_change(_, _, _, _, St) ->
     {ok, St}.
+
+handle_config_terminate(_, stop, _) -> ok;
+handle_config_terminate(_Server, _Reason, St) ->
+    Fun = fun() ->
+        timer:sleep(5000),
+        config:listen_for_changes(?MODULE, St)
+    end,
+    spawn(Fun).
+
+bucket_shard(ShardName, [B|Bs]=Buckets0) ->
+    case waiting(ShardName, Buckets0) of
+        true -> Buckets0;
+        false -> [sets:add_element(ShardName, B)|Bs]
+    end.
+
+waiting(_, []) ->
+    false;
+waiting(ShardName, [B|Bs]) ->
+    case sets:is_element(ShardName, B) of
+        true -> true;
+        false -> waiting(ShardName, Bs)
+    end.
+
+rebucket_shards(Frequency, Delay, Buckets0) ->
+    case (Delay div Frequency + 1) - length(Buckets0) of
+        0 ->
+            Buckets0;
+        N when N < 0 ->
+            %% Reduce the number of buckets by merging the last N + 1 together
+            {ToMerge, [B|Buckets1]} = lists:split(abs(N), Buckets0),
+            [sets:union([B|ToMerge])|Buckets1];
+        M ->
+            %% Extend the number of buckets by M
+            lists:duplicate(M, sets:new()) ++ Buckets0
+    end.
+
+%% To ensure that mem3_sync:push/2 is indeed called with roughly the frequency
+%% specified by #state.frequency, every message callback must return via a call
+%% to maybe_push_shards/1 rather than directly. All timing coordination - i.e.,
+%% calling mem3_sync:push/2 or setting a proper timeout to ensure that pending
+%% messages aren't dropped in case no further messages arrive - is handled here.
+maybe_push_shards(#state{last_push=undefined} = St) ->
+    {ok, St#state{last_push=os:timestamp()}, St#state.frequency};
+maybe_push_shards(St) ->
+    #state{frequency=Frequency, last_push=LastPush, buckets=Buckets0} = St,
+    Now = os:timestamp(),
+    Delta = timer:now_diff(Now, LastPush) div 1000,
+    case Delta > Frequency of
+        true ->
+            {Buckets1, [ToPush]} = lists:split(length(Buckets0) - 1, Buckets0),
+            Buckets2 = [sets:new()|Buckets1],
+            %% There's no sets:map/2!
+            sets:fold(
+                fun(ShardName, _) -> push_shard(ShardName) end,
+                undefined,
+                ToPush
+            ),
+            {ok, St#state{last_push=Now, buckets=Buckets2}, Frequency};
+        false ->
+            {ok, St, Frequency - Delta}
+    end.
+
+push_shard(ShardName) ->
+    try mem3:shards(mem3:dbname(ShardName)) of
+    Shards ->
+        Targets = [S || #shard{node=N, name=Name} = S <- Shards,
+            N =/= node(), Name =:= ShardName],
+        Live = nodes(),
+        [mem3_sync:push(ShardName,N) || #shard{node=N} <- Targets,
+            lists:member(N, Live)]
+    catch error:database_does_not_exist ->
+        ok
+    end.