You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/09/25 19:34:19 UTC

[couchdb] branch master updated: Do not buffer rexi messages to disconnected nodes

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1eaf178  Do not buffer rexi messages to disconnected nodes
1eaf178 is described below

commit 1eaf17890ab8d6bf7504355e4e32aaf2357b1398
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Mon Sep 25 13:00:13 2017 -0400

    Do not buffer rexi messages to disconnected nodes
    
    Instead wait 15 seconds after last cluster configuration change, if there
    were no more changes to the cluster, stop rexi buffers and servers for nodes
    which are no longer connected.
    
    Extract and reuse cluster stability check from `couch_replicator_clustering`
    and move it to `mem3_cluster` module, so both replicator and rexi can use it.
    Users of `mem3_cluster` would implement a behavior callback API then spawn_link
    the cluster monitor with their specific period values.
    
    This also simplifies the logic in rexi_server_mon as it no longer needs to
    handle `{nodeup, _}` and `{nodedown, _}` messages. On any cluster membership
    change it will get a `cluster_unstable` message. It then immediately spawns new
    servers and buffers if needed. Only when cluster has stabilized it will stop
    servers and buffers for disconnected nodes. The idea is to allow for short
    periods of disconnects between nodes before throwing away all the buffered
    messages.
---
 .../src/couch_replicator_clustering.erl            | 116 +++++----------
 src/mem3/src/mem3_cluster.erl                      | 161 +++++++++++++++++++++
 src/mem3/test/mem3_cluster_test.erl                | 133 +++++++++++++++++
 src/rexi/src/rexi_server_mon.erl                   |  84 ++++++++---
 4 files changed, 396 insertions(+), 98 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
index 7618f24..ed01465 100644
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator/src/couch_replicator_clustering.erl
@@ -28,6 +28,7 @@
 
 -behaviour(gen_server).
 -behaviour(config_listener).
+-behaviour(mem3_cluster).
 
 -export([
     start_link/0
@@ -55,6 +56,12 @@
     handle_config_terminate/3
 ]).
 
+% mem3_cluster callbacks
+-export([
+    cluster_stable/1,
+    cluster_unstable/1
+]).
+
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("mem3/include/mem3.hrl").
 
@@ -63,11 +70,8 @@
 -define(RELISTEN_DELAY, 5000).
 
 -record(state, {
-    start_time :: erlang:timestamp(),
-    last_change :: erlang:timestamp(),
-    period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer(),
-    start_period = ?DEFAULT_START_PERIOD :: non_neg_integer(),
-    timer :: reference()
+    mem3_cluster_pid :: pid(),
+    cluster_stable :: boolean()
 }).
 
 
@@ -115,64 +119,55 @@ link_cluster_event_listener(Mod, Fun, Args)
     Pid.
 
 
+% Mem3 cluster callbacks
+
+cluster_unstable(Server) ->
+    couch_replicator_notifier:notify({cluster, unstable}),
+    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
+    couch_log:notice("~s : cluster unstable", [?MODULE]),
+    gen_server:cast(Server, cluster_unstable),
+    Server.
+
+cluster_stable(Server) ->
+    couch_replicator_notifier:notify({cluster, stable}),
+    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
+    couch_log:notice("~s : cluster stable", [?MODULE]),
+    gen_server:cast(Server, cluster_stable),
+    Server.
+
+
 % gen_server callbacks
 
 init([]) ->
-    net_kernel:monitor_nodes(true),
     ok = config:listen_for_changes(?MODULE, nil),
     Period = abs(config:get_integer("replicator", "cluster_quiet_period",
         ?DEFAULT_QUIET_PERIOD)),
     StartPeriod = abs(config:get_integer("replicator", "cluster_start_period",
         ?DEFAULT_START_PERIOD)),
-    couch_log:debug("Initialized clustering gen_server ~w", [self()]),
     couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
-    {ok, #state{
-        start_time = os:timestamp(),
-        last_change = os:timestamp(),
-        period = Period,
-        start_period = StartPeriod,
-        timer = new_timer(StartPeriod)
-    }}.
+    {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod,
+        Period),
+    {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}.
 
 
 terminate(_Reason, _State) ->
     ok.
 
 
-handle_call(is_stable, _From, State) ->
-    {reply, is_stable(State), State}.
+handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
+    {reply, IsStable, State}.
 
 
-handle_cast({set_period, QuietPeriod}, State) ->
-    {noreply, State#state{period = QuietPeriod}}.
+handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
+    ok = mem3_cluster:set_period(Pid, Period),
+    {noreply, State};
 
+handle_cast(cluster_stable, State) ->
+    {noreply, State#state{cluster_stable = true}};
 
-handle_info({nodeup, Node}, State) ->
-    Timer = new_timer(interval(State)),
-    couch_replicator_notifier:notify({cluster, unstable}),
-    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
-    couch_log:notice("~s : nodeup ~s, cluster unstable", [?MODULE, Node]),
-    {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
+handle_cast(cluster_unstable, State) ->
+    {noreply, State#state{cluster_stable = false}}.
 
-handle_info({nodedown, Node}, State) ->
-    Timer = new_timer(interval(State)),
-    couch_replicator_notifier:notify({cluster, unstable}),
-    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
-    couch_log:notice("~s : nodedown ~s, cluster unstable", [?MODULE, Node]),
-    {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
-
-handle_info(stability_check, State) ->
-   erlang:cancel_timer(State#state.timer),
-   case is_stable(State) of
-       true ->
-           couch_replicator_notifier:notify({cluster, stable}),
-           couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
-           couch_log:notice("~s : publish cluster `stable` event", [?MODULE]),
-           {noreply, State};
-       false ->
-           Timer = new_timer(interval(State)),
-           {noreply, State#state{timer = Timer}}
-   end;
 
 handle_info(restart_config_listener, State) ->
     ok = config:listen_for_changes(?MODULE, nil),
@@ -185,41 +180,6 @@ code_change(_OldVsn, State, _Extra) ->
 
 %% Internal functions
 
--spec new_timer(non_neg_integer()) -> reference().
-new_timer(IntervalSec) ->
-    erlang:send_after(IntervalSec * 1000, self(), stability_check).
-
-
-% For the first Period seconds after node boot we check cluster stability every
-% StartPeriod seconds. Once the initial Period seconds have passed we continue
-% to monitor once every Period seconds
--spec interval(#state{}) -> non_neg_integer().
-interval(#state{period = Period, start_period = StartPeriod,
-        start_time = T0}) ->
-    case now_diff_sec(T0) > Period of
-        true ->
-            % Normal operation
-            Period;
-        false ->
-            % During startup
-            StartPeriod
-    end.
-
-
--spec is_stable(#state{}) -> boolean().
-is_stable(#state{last_change = TS} = State) ->
-    now_diff_sec(TS) > interval(State).
-
-
--spec now_diff_sec(erlang:timestamp()) -> non_neg_integer().
-now_diff_sec(Time) ->
-    case timer:now_diff(os:timestamp(), Time) of
-        USec when USec < 0 ->
-            0;
-        USec when USec >= 0 ->
-             USec / 1000000
-    end.
-
 
 handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
     ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}),
diff --git a/src/mem3/src/mem3_cluster.erl b/src/mem3/src/mem3_cluster.erl
new file mode 100644
index 0000000..7e3d477
--- /dev/null
+++ b/src/mem3/src/mem3_cluster.erl
@@ -0,0 +1,161 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Maintain cluster stability information. A cluster is considered stable if there
+% were no changes to during a given period of time.
+%
+% To be notified of cluster stability / instability the owner module must
+% implement the mem3_cluster behavior. When cluster membership changes,
+% cluster_unstable behavior callback will be called. After that is are no more
+% changes to the cluster, then cluster_stable callback will be called.
+%
+% The period is passed in as start argument but it can also be set dynamically
+% via the set_period/2 API call.
+%
+% In some cases it might be useful to have a shorter pariod during startup.
+% That can be configured via the StartPeriod argument. If the time since start
+% is less than a full period, then the StartPeriod is used as the period.
+
+
+-module(mem3_cluster).
+
+-behaviour(gen_server).
+
+-export([
+    start_link/4,
+    set_period/2
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-callback cluster_stable(Context :: term()) -> NewContext :: term().
+-callback cluster_unstable(Context :: term()) -> NewContext :: term().
+
+
+-record(state, {
+    mod :: atom(),
+    ctx :: term(),
+    start_time :: erlang:timestamp(),
+    last_change :: erlang:timestamp(),
+    period :: integer(),
+    start_period :: integer(),
+    timer :: reference()
+}).
+
+
+-spec start_link(module(), term(), integer(), integer()) ->
+    {ok, pid()} | ignore | {error, term()}.
+start_link(Module, Context, StartPeriod, Period)
+        when is_atom(Module), is_integer(StartPeriod), is_integer(Period) ->
+    gen_server:start_link(?MODULE, [Module, Context, StartPeriod, Period], []).
+
+
+-spec set_period(pid(), integer()) -> ok.
+set_period(Server, Period) when is_pid(Server), is_integer(Period) ->
+    gen_server:cast(Server, {set_period, Period}).
+
+
+% gen_server callbacks
+
+init([Module, Context, StartPeriod, Period]) ->
+    net_kernel:monitor_nodes(true),
+    {ok, #state{
+        mod = Module,
+        ctx = Context,
+        start_time = os:timestamp(),
+        last_change = os:timestamp(),
+        period = Period,
+        start_period = StartPeriod,
+        timer = new_timer(StartPeriod)
+     }}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_call(_Msg, _From, State) ->
+    {reply, ignored, State}.
+
+
+handle_cast({set_period, Period}, State) ->
+    {noreply, State#state{period = Period}}.
+
+
+handle_info({nodeup, _Node}, State) ->
+    {noreply, cluster_changed(State)};
+
+handle_info({nodedown, _Node}, State) ->
+    {noreply, cluster_changed(State)};
+
+handle_info(stability_check, #state{mod = Mod, ctx = Ctx} = State) ->
+   erlang:cancel_timer(State#state.timer),
+   case now_diff_sec(State#state.last_change) > interval(State) of
+       true ->
+           {noreply, State#state{ctx = Mod:cluster_stable(Ctx)}};
+       false ->
+           Timer = new_timer(interval(State)),
+           {noreply, State#state{timer = Timer}}
+   end.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+%% Internal functions
+
+-spec cluster_changed(#state{}) -> #state{}.
+cluster_changed(#state{mod = Mod, ctx = Ctx} = State) ->
+    State#state{
+        last_change = os:timestamp(),
+        timer = new_timer(interval(State)),
+        ctx = Mod:cluster_unstable(Ctx)
+    }.
+
+
+-spec new_timer(non_neg_integer()) -> reference().
+new_timer(IntervalSec) ->
+    erlang:send_after(IntervalSec * 1000, self(), stability_check).
+
+
+% For the first Period seconds after node boot we check cluster stability every
+% StartPeriod seconds. Once the initial Period seconds have passed we continue
+% to monitor once every Period seconds
+-spec interval(#state{}) -> non_neg_integer().
+interval(#state{period = Period, start_period = StartPeriod,
+        start_time = T0}) ->
+    case now_diff_sec(T0) > Period of
+        true ->
+            % Normal operation
+            Period;
+        false ->
+            % During startup
+            StartPeriod
+    end.
+
+
+-spec now_diff_sec(erlang:timestamp()) -> non_neg_integer().
+now_diff_sec(Time) ->
+    case timer:now_diff(os:timestamp(), Time) of
+        USec when USec < 0 ->
+            0;
+        USec when USec >= 0 ->
+             USec / 1000000
+    end.
diff --git a/src/mem3/test/mem3_cluster_test.erl b/src/mem3/test/mem3_cluster_test.erl
new file mode 100644
index 0000000..4610d64
--- /dev/null
+++ b/src/mem3/test/mem3_cluster_test.erl
@@ -0,0 +1,133 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_cluster_test).
+
+-behavior(mem3_cluster).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([
+    cluster_unstable/1,
+    cluster_stable/1
+]).
+
+
+% Mem3 cluster callbacks
+
+cluster_unstable(Server) ->
+    Server ! cluster_unstable,
+    Server.
+
+cluster_stable(Server) ->
+    Server ! cluster_stable,
+    Server.
+
+
+mem3_cluster_test_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            t_cluster_stable_during_startup_period(),
+            t_cluster_unstable_delivered_on_nodeup(),
+            t_cluster_unstable_delivered_on_nodedown(),
+            t_wait_period_is_reset_after_last_change()
+         ]
+    }.
+
+
+t_cluster_stable_during_startup_period() ->
+   ?_test(begin
+        {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2),
+        register(?MODULE, Pid),
+        receive
+            cluster_stable ->
+                ?assert(true)
+            after 1500 ->
+                throw(timeout)
+        end,
+        unlink(Pid),
+        exit(Pid, kill)
+    end).
+
+
+t_cluster_unstable_delivered_on_nodeup() ->
+   ?_test(begin
+        {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2),
+        register(?MODULE, Pid),
+        Pid ! {nodeup, node()},
+        receive
+            cluster_unstable ->
+                ?assert(true)
+            after 1000 ->
+                throw(timeout)
+        end,
+        unlink(Pid),
+        exit(Pid, kill)
+    end).
+
+
+t_cluster_unstable_delivered_on_nodedown() ->
+   ?_test(begin
+        {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2),
+        register(?MODULE, Pid),
+        Pid ! {nodedown, node()},
+        receive
+            cluster_unstable ->
+                ?assert(true)
+            after 1000 ->
+                throw(timeout)
+        end,
+        unlink(Pid),
+        exit(Pid, kill)
+    end).
+
+
+t_wait_period_is_reset_after_last_change() ->
+   ?_test(begin
+        {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 1),
+        register(?MODULE, Pid),
+        timer:sleep(800),
+        Pid ! {nodeup, node()}, % after 800 sec send a nodeup
+        receive
+            cluster_stable ->
+                ?assert(false)
+            after 400 ->
+                ?assert(true) % stability check should have been reset
+        end,
+        timer:sleep(1000),
+        receive
+            cluster_stable ->
+                ?assert(true)
+            after 0 ->
+                ?assert(false) % cluster_stable arrives after enough quiet time
+        end,
+        unlink(Pid),
+        exit(Pid, kill)
+    end).
+
+
+% Test helper functions
+
+setup() ->
+    ok.
+
+teardown(_) ->
+    case whereis(?MODULE) of
+        undefined ->
+            ok;
+        Pid when is_pid(Pid) ->
+            unlink(Pid),
+            exit(Pid, kill)
+    end.
diff --git a/src/rexi/src/rexi_server_mon.erl b/src/rexi/src/rexi_server_mon.erl
index e6b5eb9..86fecaf 100644
--- a/src/rexi/src/rexi_server_mon.erl
+++ b/src/rexi/src/rexi_server_mon.erl
@@ -14,6 +14,7 @@
 
 -module(rexi_server_mon).
 -behaviour(gen_server).
+-behaviour(mem3_cluster).
 -vsn(1).
 
 
@@ -32,8 +33,13 @@
     code_change/3
 ]).
 
+-export([
+   cluster_stable/1,
+   cluster_unstable/1
+]).
 
--define(INTERVAL, 60000).
+
+-define(CLUSTER_STABILITY_PERIOD_SEC, 15).
 
 
 start_link(ChildMod) ->
@@ -45,9 +51,23 @@ status() ->
     gen_server:call(?MODULE, status).
 
 
+% Mem3 cluster callbacks
+
+cluster_unstable(Server) ->
+    couch_log:notice("~s : cluster unstable", [?MODULE]),
+    gen_server:cast(Server, cluster_unstable),
+    Server.
+
+cluster_stable(Server) ->
+    gen_server:cast(Server, cluster_stable),
+    Server.
+
+
+% gen_server callbacks
+
 init(ChildMod) ->
-    net_kernel:monitor_nodes(true),
-    erlang:send(self(), check_nodes),
+    {ok, _Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(),
+        ?CLUSTER_STABILITY_PERIOD_SEC, ?CLUSTER_STABILITY_PERIOD_SEC),
     {ok, ChildMod}.
 
 
@@ -67,24 +87,27 @@ handle_call(Msg, _From, St) ->
     couch_log:notice("~s ignored_call ~w", [?MODULE, Msg]),
     {reply, ignored, St}.
 
-
-handle_cast(Msg, St) ->
-    couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]),
-    {noreply, St}.
-
-
-handle_info({nodeup, _}, ChildMod) ->
+% If cluster is unstable a node was added or just removed. Check if any nodes
+% can be started, but do not immediately stop nodes, defer that till cluster
+% stabilized.
+handle_cast(cluster_unstable, ChildMod) ->
+    couch_log:notice("~s : cluster unstable", [ChildMod]),
     start_servers(ChildMod),
     {noreply, ChildMod};
 
-handle_info({nodedown, _}, St) ->
-    {noreply, St};
-
-handle_info(check_nodes, ChildMod) ->
+% When cluster is stable, start any servers for new nodes and stop servers for
+% the ones that disconnected.
+handle_cast(cluster_stable, ChildMod) ->
+    couch_log:notice("~s : cluster stable", [ChildMod]),
     start_servers(ChildMod),
-    erlang:send_after(?INTERVAL, self(), check_nodes),
+    stop_servers(ChildMod),
     {noreply, ChildMod};
 
+handle_cast(Msg, St) ->
+    couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]),
+    {noreply, St}.
+
+
 handle_info(Msg, St) ->
     couch_log:notice("~s ignored_info ~w", [?MODULE, Msg]),
     {noreply, St}.
@@ -101,13 +124,27 @@ start_servers(ChildMod) ->
         {ok, _} = start_server(ChildMod, Id)
     end, missing_servers(ChildMod)).
 
+stop_servers(ChildMod) ->
+    lists:foreach(fun(Id) ->
+        ok = stop_server(ChildMod, Id)
+    end, extra_servers(ChildMod)).
+
+
+server_ids(ChildMod) ->
+    Nodes = [node() | nodes()],
+    [list_to_atom(lists:concat([ChildMod, "_", Node])) || Node <- Nodes].
+
+
+running_servers(ChildMod) ->
+    [Id || {Id, _, _, _} <- supervisor:which_children(sup_module(ChildMod))].
+
 
 missing_servers(ChildMod) ->
-    ServerIds = [list_to_atom(lists:concat([ChildMod, "_", Node]))
-        || Node <- [node() | nodes()]],
-    SupModule = sup_module(ChildMod),
-    ChildIds = [Id || {Id, _, _, _} <- supervisor:which_children(SupModule)],
-    ServerIds -- ChildIds.
+    server_ids(ChildMod) -- running_servers(ChildMod).
+
+
+extra_servers(ChildMod) ->
+    running_servers(ChildMod) -- server_ids(ChildMod).
 
 
 start_server(ChildMod, ChildId) ->
@@ -126,5 +163,12 @@ start_server(ChildMod, ChildId) ->
             erlang:error(Else)
     end.
 
+
+stop_server(ChildMod, ChildId) ->
+    SupMod = sup_module(ChildMod),
+    ok = supervisor:terminate_child(SupMod, ChildId),
+    ok = supervisor:delete_child(SupMod, ChildId).
+
+
 sup_module(ChildMod) ->
     list_to_atom(lists:concat([ChildMod, "_sup"])).

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].