You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:23:34 UTC

[24/33] rexi commit: updated refs/heads/master to bbf59a2

Configure buffer limit by message count

This allows an operator to decide how large the buffers should be. It
also provides an escape valve to clear the buffer entirely.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/cd07cb8c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/cd07cb8c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/cd07cb8c

Branch: refs/heads/master
Commit: cd07cb8c04df1510253718df7f63d6783e3ec0a7
Parents: 23cda37
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Tue Jun 3 14:25:59 2014 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:08:01 2014 +0100

----------------------------------------------------------------------
 src/rexi_buffer.erl | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/cd07cb8c/src/rexi_buffer.erl
----------------------------------------------------------------------
diff --git a/src/rexi_buffer.erl b/src/rexi_buffer.erl
index 874ec3c..f75399c 100644
--- a/src/rexi_buffer.erl
+++ b/src/rexi_buffer.erl
@@ -29,9 +29,6 @@
     count = 0
 }).
 
-%% TODO Leverage os_mon to discover available memory in the system
--define (MAX_MEMORY, 17179869184).
-
 start_link(ServerId) ->
     gen_server:start_link({local, ServerId}, ?MODULE, nil, []).
 
@@ -41,7 +38,12 @@ send(Dest, Msg) ->
 
 
 init(_) ->
-    {ok, #state{}}.
+    %% TODO Leverage os_mon to discover available memory in the system
+    Max = list_to_integer(config:get("rexi", "buffer_count", "2000")),
+    {ok, #state{max_count = Max}}.
+
+handle_call(erase_buffer, _From, State) ->
+    {reply, ok, State#state{buffer = queue:new(), count = 0}, 0};
 
 handle_call(get_buffered_count, _From, State) ->
     {reply, State#state.count, State, 0}.
@@ -49,7 +51,7 @@ handle_call(get_buffered_count, _From, State) ->
 handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
     margaret_counter:increment([erlang, rexi, buffered]),
     Q2 = queue:in({Dest, Msg}, Q),
-    case should_drop() of
+    case should_drop(State) of
     true ->
             {noreply, State#state{buffer = queue:drop(Q2)}, 0};
     false ->
@@ -94,11 +96,14 @@ handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) ->
 terminate(_Reason, _State) ->
     ok.
 
+code_change(_OldVsn, {state, Buffer, Sender, Count}, _Extra) ->
+    Max = list_to_integer(config:get("rexi", "buffer_count", "2000")),
+    {ok, #state{buffer=Buffer, sender=Sender, count=Count, max_count=Max}};
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-should_drop() ->
-    erlang:memory(total) > ?MAX_MEMORY.
+should_drop(#state{count = Count, max_count = Max}) ->
+    Count >= Max.
 
 get_node({_, Node}) when is_atom(Node) ->
     Node;