You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/09/19 21:51:29 UTC

[couchdb] branch reconnect-cluster-nodes-explicitly created (now 8a627cc51)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch reconnect-cluster-nodes-explicitly
in repository https://gitbox.apache.org/repos/asf/couchdb.git


      at 8a627cc51 Explicitly maintain a fully connected cluster

This branch includes the following new commits:

     new 8a627cc51 Explicitly maintain a fully connected cluster

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: Explicitly maintain a fully connected cluster

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch reconnect-cluster-nodes-explicitly
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 8a627cc51e7ed07ab51c7343e33274569fa1b0fc
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Mon Sep 19 17:39:19 2022 -0400

    Explicitly maintain a fully connected cluster
    
    Previously, it was possible for the nodes to disconnect and for that state to
    persist util the nodes restarted. Some fabric requests could reconnect the
    nodes as a side-effect of sending remote messages, but most of the fabric
    requests currently start a rexi monitor, which immediately delivers a
    `rexi_DOWN` message to the coordinator for worker nodes not in the `[node() |
    nodes()]` list. That happens before `erlang:send/2,3` gets called so there is
    nothing there to eventually reconnect the nodes.
    
    To avoid relying on the random request reconnecting the cluster, use an
    explicit monitor process. It does the initial connections, as well as
    periodically maintains them.
    
    [1] https://github.com/apache/couchdb/blob/main/src/rexi/src/rexi_monitor.erl#L45
---
 src/mem3/src/mem3_distribution.erl | 74 ++++++++++++++++++++++++++++++++++++++
 src/mem3/src/mem3_sup.erl          |  1 +
 src/mem3/src/mem3_sync.erl         |  1 -
 src/mem3/src/mem3_sync_event.erl   |  2 +-
 4 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/src/mem3/src/mem3_distribution.erl b/src/mem3/src/mem3_distribution.erl
new file mode 100644
index 000000000..15f56bf7c
--- /dev/null
+++ b/src/mem3/src/mem3_distribution.erl
@@ -0,0 +1,74 @@
+-module(mem3_distribution).
+
+-behaviour(gen_server).
+
+-export([
+    start_link/0,
+    connect_node/1
+]).
+
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+-record(st, {
+    tref
+}).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+connect_node(Node) when is_atom(Node) ->
+    net_kernel:connect_node(Node).
+
+init(_) ->
+    connect(false),
+    {ok, #st{tref = erlang:send_after(wait_msec(), connect, self())}}.
+
+handle_call(Msg, _From, #st{} = St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+handle_cast(Msg, #st{} = St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+handle_info(connect, #st{} = St) ->
+    erlang:cancel_timer(St#st.tref),
+    ok = connect(true),
+    {noreply, St#st{tref = erlang:send_after(wait_msec(), connect, self())}};
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+code_change(_OldVsn, #st{} = St, _Extra) ->
+    {ok, St}.
+
+connect(Log) ->
+    Expected = [N || N <- mem3:nodes(), N =/= node()],
+    Nodes = nodes(),
+    NotConnected = [N || N <- Expected, not lists:member(N, Nodes)],
+    connect(NotConnected, Log).
+
+connect([], _Log) ->
+    ok;
+connect([N | Rest], Log) ->
+    ConnectRes = connect_node(N),
+    log(Log, ConnectRes, N),
+    connect(Rest, Log).
+
+log(true, true, Node) ->
+    couch_log:notice("~s : connected to ~s", [?MODULE, Node]),
+    ok;
+log(_, _, _) ->
+    % Failed to connect or we don't want to log it
+    ok.
+
+wait_msec() ->
+    IntervalSec = config:get_integer("mem3", "reconnect_interval_sec", 37),
+    IntervalSec * 1000 + rand:uniform(jitter_msec()).
+
+jitter_msec() ->
+    JitterSec = config:get_integer("mem3", "reconnect_jitter_sec", 17),
+    JitterSec * 1000.
diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl
index a2dc5ba8d..862ef6b50 100644
--- a/src/mem3/src/mem3_sup.erl
+++ b/src/mem3/src/mem3_sup.erl
@@ -21,6 +21,7 @@ init(_Args) ->
     Children = [
         child(mem3_events),
         child(mem3_nodes),
+        child(mem3_distribution),
         child(mem3_seeds),
         % Order important?
         child(mem3_sync_nodes),
diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl
index 3d1c18420..179435965 100644
--- a/src/mem3/src/mem3_sync.erl
+++ b/src/mem3/src/mem3_sync.erl
@@ -266,7 +266,6 @@ sync_nodes_and_dbs() ->
     [push(Db, Node) || Db <- local_dbs()].
 
 initial_sync() ->
-    [net_kernel:connect_node(Node) || Node <- mem3:nodes()],
     mem3_sync_nodes:add(nodes()).
 
 initial_sync(Live) ->
diff --git a/src/mem3/src/mem3_sync_event.erl b/src/mem3/src/mem3_sync_event.erl
index ec6debb45..de9d3e74e 100644
--- a/src/mem3/src/mem3_sync_event.erl
+++ b/src/mem3/src/mem3_sync_event.erl
@@ -28,7 +28,7 @@ init(_) ->
     {ok, nil}.
 
 handle_event({add_node, Node}, State) when Node =/= node() ->
-    net_kernel:connect_node(Node),
+    mem3_distribution:connect_node(Node),
     mem3_sync_nodes:add([Node]),
     {ok, State};
 handle_event({remove_node, Node}, State) ->