You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:23:24 UTC

[14/33] rexi commit: updated refs/heads/master to bbf59a2

Rejigger the governor implementation

Previously we'd spawn a number of processes to send messages to
'noconnect' / 'nosuspend' nodes.  Now we're buffering the messages that
need to be sent directly in each governor and sending them one at a
time.  This prevents the net_kernel from tipping over in the noconnect
case.  We also decide whether to drop messages based on memory
consumption in the node instead of process limits (since we're not
spawning anymore).

BugzID: 23717
BugzID: 23718


Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/1030906b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/1030906b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/1030906b

Branch: refs/heads/master
Commit: 1030906b85de7a398baf6363b9d3f6f21e93b257
Parents: 11b3859
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Tue Oct 15 22:46:52 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:02:32 2014 +0100

----------------------------------------------------------------------
 src/rexi_governor.erl | 97 ++++++++++++++++++++++++++--------------------
 src/rexi_utils.erl    |  8 +++-
 2 files changed, 62 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/1030906b/src/rexi_governor.erl
----------------------------------------------------------------------
diff --git a/src/rexi_governor.erl b/src/rexi_governor.erl
index 876165d..fdf8c93 100644
--- a/src/rexi_governor.erl
+++ b/src/rexi_governor.erl
@@ -18,54 +18,67 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(state, {pids = ets:new(pids, [set]),
-                spawn_max = 10000,
-                spawn_cnt = 0,
-                drop_cnt = 0}).
-
-init([PidSpawnMax]) ->
-    {ok, #state{spawn_max = PidSpawnMax}}.
-
-handle_call(_Request, _From, State) ->
-    Reply = ok,
-    {reply, Reply, State}.
-
-handle_cast({spawn_and_track, Dest, Msg},
-            #state{pids = Pids,
-                   spawn_max = SpawnMax,
-                   spawn_cnt = SC,
-                   drop_cnt = DC} = State) ->
-    {NewSC, NewDC} =
-    case ets:info(Pids, size) < SpawnMax of
-    true ->
-        {Pid, Ref} = spawn_monitor(erlang, send, [Dest, Msg]),
-        ets:insert(Pids, {Pid, Ref}),
-        {SC + 1, DC};
-    false ->
-        % drop message on floor
-        {SC, DC + 1}
-    end,
-    {noreply, State#state{spawn_cnt = NewSC, drop_cnt = NewDC}};
+-export ([
+    send/2,
+    start_link/1
+]).
+
+-record(state, {
+    buffer = queue:new(),
+    count = 0
+}).
 
-handle_cast(nodeout, #state{pids = Pids} = State) ->
-    % kill all the pids
-    ets:foldl(fun({P, _Ref}, Acc) ->
-                  exit(P, kill),
-                  Acc
-              end, [], Pids),
-    ets:delete_all_objects(Pids),
-    {noreply, State}.
+%% TODO Leverage os_mon to discover available memory in the system
+-define (MAX_MEMORY, 17179869184).
 
-handle_info({'DOWN', _, process, Pid, normal},
-            #state{pids = Pids} = State) ->
-    ets:delete(Pids, Pid),
-    {noreply, State};
+start_link(ServerId) ->
+    gen_server:start_link({local, ServerId}, ?MODULE, nil, []).
 
-handle_info({'DOWN', _, process, _Pid, killed}, State) ->
-    {noreply, State}.
+send(Dest, Msg) ->
+    Server = list_to_atom(lists:concat([rexi_governor, "_", get_node(Dest)])),
+    gen_server:cast(Server, {deliver, Dest, Msg}).
+
+
+init(_) ->
+    {ok, #state{}}.
+
+handle_call(get_buffered_count, _From, State) ->
+    {reply, State#state.count, State, 0}.
+
+handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
+    margaret_counter:increment([erlang, rexi, buffered]),
+    Q2 = queue:in({Dest, Msg}, Q),
+    case should_drop() of
+    true ->
+            {noreply, State#state{buffer = queue:drop(Q2)}, 0};
+    false ->
+            {noreply, State#state{buffer = Q2, count = C+1}, 0}
+    end.
+
+handle_info(timeout, State) ->
+    #state{buffer = Q, count = C} = State,
+    case queue:out_r(Q) of
+        {{value, {Dest, Msg}}, Q2} ->
+            erlang:send(Dest, Msg);
+        {empty, Q2} ->
+            ok
+    end,
+    if C > 1 ->
+        {noreply, State#state{buffer = Q2, count = C-1}, 0};
+    true ->
+        {noreply, State#state{buffer = Q2, count = 0}}
+    end.
 
 terminate(_Reason, _State) ->
     ok.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
+
+should_drop() ->
+    erlang:memory(total) > ?MAX_MEMORY.
+
+get_node({_, Node}) when is_atom(Node) ->
+    Node;
+get_node(Pid) when is_pid(Pid) ->
+    node(Pid).

http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/1030906b/src/rexi_utils.erl
----------------------------------------------------------------------
diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl
index 6e03757..3c89ca9 100644
--- a/src/rexi_utils.erl
+++ b/src/rexi_utils.erl
@@ -29,7 +29,13 @@ server_pid(Node) ->
 
 %% @doc send a message as quickly as possible
 send(Dest, Msg) ->
-    rexi_gov_manager:send(Dest, Msg).
+    case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
+    ok ->
+        ok;
+    _ ->
+        % treat nosuspend and noconnect the same
+        rexi_governor:send(Dest, Msg)
+    end.
 
 %% @doc set up the receive loop with an overall timeout
 -spec recv([any()], integer(), function(), any(), timeout(), timeout()) ->