You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/09/21 21:30:20 UTC

[couchdb] branch main updated: Explicitly maintain a fully connected cluster

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new c4e8f5956 Explicitly maintain a fully connected cluster
c4e8f5956 is described below

commit c4e8f59569d35bb02538a446d6ab8bcc4bdddf10
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Mon Sep 19 17:39:19 2022 -0400

    Explicitly maintain a fully connected cluster
    
    Previously, it was possible for the nodes to disconnect, and for that state to
    persist util the nodes restarted. Some fabric requests could reconnect the
    nodes as a side-effect of sending remote messages, but most of the fabric
    requests currently start a rexi monitor, which immediately delivers a
    `rexi_DOWN` message to the coordinator for worker nodes not in the `[node() |
    nodes()]` list. That happens before `erlang:send/2,3` gets called, so there is
    nothing there to eventually reconnect the nodes.
    
    To avoid relying on the random request reconnecting the cluster, use an
    explicit monitor process. It does the initial connections, as well as
    periodically maintains them.
    
    [1] https://github.com/apache/couchdb/blob/main/src/rexi/src/rexi_monitor.erl#L45
---
 rel/overlay/etc/default.ini                    |  5 ++
 src/docs/src/config/cluster.rst                |  8 +++
 src/mem3/src/mem3_distribution.erl             | 93 ++++++++++++++++++++++++++
 src/mem3/src/mem3_sup.erl                      |  1 +
 src/mem3/src/mem3_sync.erl                     |  1 -
 src/mem3/src/mem3_sync_event.erl               |  2 +-
 src/mem3/test/eunit/mem3_distribution_test.erl | 74 ++++++++++++++++++++
 7 files changed, 182 insertions(+), 2 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index b989ba3fa..1b1f6111d 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -124,6 +124,11 @@ couch = couch_bt_engine
 ; of the ``_nodes``, ``_dbs``, and ``_users`` system databases.
 ; seedlist = couchdb@node1.example.com,couchdb@node2.example.com
 
+; Period in seconds specifying how often to attempt reconnecting to
+; disconnected nodes. There is a 25% random jitter applied to this
+; value.
+;reconnect_interval_sec = 37
+
 [chttpd]
 ; These settings affect the main, clustered port (5984 by default).
 port = {{cluster_port}}
diff --git a/src/docs/src/config/cluster.rst b/src/docs/src/config/cluster.rst
index 8801b9cde..72959829c 100644
--- a/src/docs/src/config/cluster.rst
+++ b/src/docs/src/config/cluster.rst
@@ -87,6 +87,14 @@ Cluster Options
         [cluster]
         seedlist = couchdb@node1.example.com,couchdb@node2.example.com
 
+    .. config:option:: reconnect_interval_sec:: Cluster connectivity check period.
+
+        .. versionadded:: 3.3
+
+    Period in seconds specifying how often to attempt reconnecting to
+    disconnected nodes. There is a 25% random jitter applied to this
+    value.
+
 RPC Performance Tuning
 ======================
 
diff --git a/src/mem3/src/mem3_distribution.erl b/src/mem3/src/mem3_distribution.erl
new file mode 100644
index 000000000..a2b77de8a
--- /dev/null
+++ b/src/mem3/src/mem3_distribution.erl
@@ -0,0 +1,93 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% This module is in charge of keeping the cluster connected. If nodes
+% disconnect they are reconnected with net_kernel:connect_node/1.
+
+-module(mem3_distribution).
+
+-behaviour(gen_server).
+
+-export([
+    start_link/0,
+    connect_node/1
+]).
+
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+-define(JITTER_PERCENT, 0.25).
+
+-record(st, {
+    tref
+}).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+connect_node(Node) when is_atom(Node) ->
+    net_kernel:connect_node(Node).
+
+init(_) ->
+    connect(false),
+    {ok, #st{tref = erlang:send_after(wait_msec(), self(), connect)}}.
+
+handle_call(Msg, _From, #st{} = St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+handle_cast(Msg, #st{} = St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+handle_info(connect, #st{} = St) ->
+    erlang:cancel_timer(St#st.tref),
+    ok = connect(true),
+    {noreply, St#st{tref = erlang:send_after(wait_msec(), self(), connect)}};
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+code_change(_OldVsn, #st{} = St, _Extra) ->
+    {ok, St}.
+
+connect(Log) ->
+    Expected = ordsets:from_list([N || N <- mem3:nodes(), N =/= node()]),
+    Connected = ordsets:from_list(nodes()),
+    NotConnected = ordsets:subtract(Expected, Connected),
+    connect(ordsets:to_list(NotConnected), Log).
+
+connect([], _Log) ->
+    ok;
+connect([N | Rest], Log) ->
+    ConnectRes = ?MODULE:connect_node(N),
+    log(Log, ConnectRes, N),
+    connect(Rest, Log).
+
+log(true, true, Node) ->
+    couch_log:warning("~s : reconnected to ~s", [?MODULE, Node]),
+    ok;
+log(_, _, _) ->
+    % Failed to connect or we don't want to log it
+    ok.
+
+wait_msec() ->
+    IntervalSec = config:get_integer("cluster", "reconnect_interval_sec", 37),
+    IntervalMSec = IntervalSec * 1000,
+    IntervalMSec + jitter(IntervalMSec).
+
+jitter(Interval) ->
+    Jitter = round(Interval * ?JITTER_PERCENT),
+    % rand:uniform(0) crashes!
+    rand:uniform(max(1, Jitter)).
diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl
index a2dc5ba8d..862ef6b50 100644
--- a/src/mem3/src/mem3_sup.erl
+++ b/src/mem3/src/mem3_sup.erl
@@ -21,6 +21,7 @@ init(_Args) ->
     Children = [
         child(mem3_events),
         child(mem3_nodes),
+        child(mem3_distribution),
         child(mem3_seeds),
         % Order important?
         child(mem3_sync_nodes),
diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl
index 3d1c18420..179435965 100644
--- a/src/mem3/src/mem3_sync.erl
+++ b/src/mem3/src/mem3_sync.erl
@@ -266,7 +266,6 @@ sync_nodes_and_dbs() ->
     [push(Db, Node) || Db <- local_dbs()].
 
 initial_sync() ->
-    [net_kernel:connect_node(Node) || Node <- mem3:nodes()],
     mem3_sync_nodes:add(nodes()).
 
 initial_sync(Live) ->
diff --git a/src/mem3/src/mem3_sync_event.erl b/src/mem3/src/mem3_sync_event.erl
index ec6debb45..de9d3e74e 100644
--- a/src/mem3/src/mem3_sync_event.erl
+++ b/src/mem3/src/mem3_sync_event.erl
@@ -28,7 +28,7 @@ init(_) ->
     {ok, nil}.
 
 handle_event({add_node, Node}, State) when Node =/= node() ->
-    net_kernel:connect_node(Node),
+    mem3_distribution:connect_node(Node),
     mem3_sync_nodes:add([Node]),
     {ok, State};
 handle_event({remove_node, Node}, State) ->
diff --git a/src/mem3/test/eunit/mem3_distribution_test.erl b/src/mem3/test/eunit/mem3_distribution_test.erl
new file mode 100644
index 000000000..d442a47b3
--- /dev/null
+++ b/src/mem3/test/eunit/mem3_distribution_test.erl
@@ -0,0 +1,74 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_distribution_test).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end).
+-define(MOD, mem3_distribution).
+
+setup() ->
+    Ctx = test_util:start_couch([mem3]),
+    meck:new(mem3, [passthrough]),
+    meck:new(mem3_distribution, [passthrough]),
+    meck:new(couch_log, [passthrough]),
+    Ctx.
+
+teardown(Ctx) ->
+    meck:unload(),
+    test_util:stop_couch(Ctx).
+
+mem3_distribution_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            ?TDEF_FE(periodic_scheduler_works),
+            ?TDEF_FE(connect_to_unconnected_nodes)
+        ]
+    }.
+
+periodic_scheduler_works(_) ->
+    St = sys:get_state(?MOD),
+    {st, TRef} = St,
+    TVal = erlang:read_timer(TRef),
+    ?assert(is_integer(TVal)),
+    ?assert(TVal > 0),
+    ?assert(TVal =< 70000),
+    {noreply, St1} = ?MOD:handle_info(connect, St),
+    {st, TRef1} = St1,
+    ?assertNotEqual(TRef, TRef1),
+    TVal1 = erlang:read_timer(TRef1),
+    ?assert(is_integer(TVal1)).
+
+connect_to_unconnected_nodes(_) ->
+    Nodes = ['foo', 'bar'],
+    meck:expect(mem3, nodes, 0, Nodes),
+    meck:reset(?MOD),
+    % Simulate connect timer expiry
+    ?MOD ! connect,
+    meck:wait(?MOD, connect_node, [foo], 5000),
+    meck:wait(?MOD, connect_node, [bar], 5000),
+    % connect_node returns false => no reconnection log
+    timer:sleep(100),
+    ?assertEqual(0, meck:num_calls(couch_log, warning, 2)),
+    % Make connect return true
+    meck:reset(?MOD),
+    meck:expect(?MOD, connect_node, 1, true),
+    % Simulate connect timer expiry
+    ?MOD ! connect,
+    meck:wait(?MOD, connect_node, [foo], 5000),
+    meck:wait(?MOD, connect_node, [bar], 5000),
+    % connect_node returns true => emit reconnection log
+    meck:wait(2, couch_log, warning, 2, 5000).